如何在Spark Graphframes中检测循环?

bvpmtnay  于 2023-08-06  发布在  Apache
关注(0)|答案(3)|浏览(126)

这里是一个Spark Graphframes df表示一个有向图,这个图中可能有一些圈。如何检测Graphframe中的循环?
例如,这里有一个图表

| src | dst |
| --- | --- |
| 1   | 2   |
| 2   | 3   |
| 3   | 4   |
| 3   | 1   |
| 4   | 1   |

字符串
这个图中的循环应该是{1,2,3}和{1,2,3,4}。

flseospp

flseospp1#

你可以使用BFS算法来寻找图中的圈

yqkkidmi

yqkkidmi2#

我对“Spark图文框”一无所知
希望您会发现它很有用,下面是如何修改BFS来查找周期:
在标准的BFS算法中,代码会跟踪哪些顶点以前被访问过。当搜索当前顶点的可达邻居时,跳过先前访问过的顶点。
在为寻找循环而修改的BFS中,遇到先前访问过的顶点可能会发生,因为存在循环。
为了检查这一点,应用Dijsktra来查找从当前顶点开始,经过图的其余部分,并返回到先前访问过的顶点的最短路径。如果这样的路径存在,那么它就是一个循环。
下面是一个例子


的数据
这是算法的基本要素,但需要处理一些重要的细节:

  • 相同的循环可以被检测多次。需要代码来检查新发现的循环是否是新的
  • 多重图(节点对之间的多条边)
  • 具有多个分量的图
  • 无向图。(您的问题指定了有向图,因此您可以不必测试无向图。然而,对算法的另一个修改也将处理这些)

也许一个实现这一点的C++代码的链接会有帮助?

7y4bm7vi

7y4bm7vi3#

虽然GraphFrames本身可能无法完全为您的任务提供开箱即用的必要功能,但将其与NetworkX和PandasUDF结合使用被证明是一种有效的解决方案。首先,让我们来研究一下NetworkX的功能,尤其是与您的示例相关的功能。
示例图的绘制:


的数据
Json的算法是NetworkX中simple_cycles函数的基础,与其他基于DFS修改的算法(source)相比,它具有更好的时间复杂度。
在NetworkX中查找循环的代码:

import pandas as pd
import networkx as nx

df_edges = pd.DataFrame({
    'src': [1, 2, 3, 3, 4],
    'dst': [2, 3, 4, 1, 1]
})
# Create a directed graph from the dataframe
G = nx.from_pandas_edgelist(df_edges, source='src', target='dst', create_using=nx.DiGraph())
# Find cycles
cycles = list(nx.simple_cycles(G))
print(cycles) #output: [[1, 2, 3], [1, 2, 3, 4]]

字符串
NetworkX函数simple_cycles显然提供了所需的功能。然而,考虑到潜在的可扩展性问题和在Spark生态系统中运行的需求,寻求以并行方式运行的解决方案是有益的。这就是PandasUDF(矢量化UDF)实用程序的亮点所在。为了制定一个可扩展和可推广的解决方案,我们的第一步是执行连接组件操作。GraphFrames方便地提供了这种功能,如下所示:

from graphframes import *
g = GraphFrame(df_edges)  
result = g.connectedComponents()


从connected components函数获得输出后(通常为[node,component id]格式),您可以使用此component id扩展原始的edge DataFrame。这将导致Spark DataFrame结构化为[src,dst,component]。
为了简洁起见,我将在示例的后续步骤中手动生成这样一个Spark DataFrame。为了说明循环查找函数在不同连接组件上的并行化能力,我还将一个附加子图的边合并到边列表中。
假设这是扩展的边缘列表

df_edges = pd.DataFrame({
    'src': [1, 2, 3, 3, 4,5,6,7],
    'dst': [2, 3, 4, 1, 1,6,7,5],
    'component' : [1,1,1,1,1,2,2,2]
})

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Convert pandas example DataFrame to Spark DataFrame
# this is in place of the processed output 
# derived from both the original DataFrame 
# and the connected components output.
spark_df_edges = spark.createDataFrame(df_edges)


以下是扩展图的可视化,由两个不同的连接组件组成:



这就是使用.show()时扩展的边列表(现在与组件ID集成)的显示方式

+---+---+---------+
|src|dst|component|
+---+---+---------+
|  1|  2|        1|
|  2|  3|        1|
|  3|  4|        1|
|  3|  1|        1|
|  4|  1|        1|
|  5|  6|        2|
|  6|  7|        2|
|  7|  5|        2|
+---+---+---------+


接下来,我们定义一个Pandas UDF,它可以应用于每组连接的组件。除了查找循环之外,该函数还旨在返回有用的信息,例如找到的循环计数和构成每个组件的每个循环的边列表:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
import json

schema = StructType([
    StructField('component', IntegerType()),
    StructField('no_of_cycles', IntegerType()),
    StructField('cyclelist', StringType())
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def find_cycles(pdf):
    G = nx.from_pandas_edgelist(pdf, source='src', target='dst', create_using=nx.DiGraph())
    cycles = list(nx.simple_cycles(G))
    cyclelist = json.dumps(cycles)
    num_cycles = len(cycles)
    return pd.DataFrame({'component': [pdf['component'].iloc[0]], 
                         'no_of_cycles': [num_cycles], 
                         'cyclelist': [cyclelist]})


现在定义了Pandas UDF,我们继续将此函数应用于每个单独的连接组件,如下所示:

cycles=spark_df_edges.groupby('component').apply(find_cycles).show(truncate=False)


结果cycles dataframe看起来像这样:

+---------+------------+-------------------------+
|component|no_of_cycles|cyclelist                |
+---------+------------+-------------------------+
|1        |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|2        |1           |[[5, 6, 7]]              |
+---------+------------+-------------------------+


最后,我们可以连接两个DataFrame:

from pyspark.sql.functions import broadcast
joined_df = spark_df_edges.join(broadcast(cycles), on='component', how='inner')
joined_df.show(truncate=False)


结果是

+---------+---+---+------------+-------------------------+
|component|src|dst|no_of_cycles|cyclelist                |
+---------+---+---+------------+-------------------------+
|1        |1  |2  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |2  |3  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |3  |4  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |3  |1  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|1        |4  |1  |2           |[[1, 2, 3], [1, 2, 3, 4]]|
|2        |5  |6  |1           |[[5, 6, 7]]              |
|2        |6  |7  |1           |[[5, 6, 7]]              |
|2        |7  |5  |1           |[[5, 6, 7]]              |
+---------+---+---+------------+-------------------------+


注意,我们可以在这里使用broadcast,因为循环 Dataframe 中的行数是连接组件的数量,通常比边缘列表中的行数小得多。broadcast函数告诉Spark将较小的DataFrame广播到所有工作节点,如果一个DataFrame比另一个小得多,这可以加快join操作。

相关问题