df_edges = pd.DataFrame({
'src': [1, 2, 3, 3, 4,5,6,7],
'dst': [2, 3, 4, 1, 1,6,7,5],
'component' : [1,1,1,1,1,2,2,2]
})
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Convert pandas example DataFrame to Spark DataFrame
# this is in place of the processed output
# derived from both the original DataFrame
# and the connected components output.
spark_df_edges = spark.createDataFrame(df_edges)
3条答案
按热度按时间flseospp1#
你可以使用BFS算法来寻找图中的圈
yqkkidmi2#
我对“Spark图文框”一无所知
希望您会发现它很有用,下面是如何修改BFS来查找周期:
在标准的BFS算法中,代码会跟踪哪些顶点以前被访问过。当搜索当前顶点的可达邻居时,跳过先前访问过的顶点。
在为寻找循环而修改的BFS中,遇到先前访问过的顶点可能会发生,因为存在循环。
为了检查这一点,应用Dijsktra来查找从当前顶点开始,经过图的其余部分,并返回到先前访问过的顶点的最短路径。如果这样的路径存在,那么它就是一个循环。
下面是一个例子
的数据
这是算法的基本要素,但需要处理一些重要的细节:
也许一个实现这一点的C++代码的链接会有帮助?
7y4bm7vi3#
虽然GraphFrames本身可能无法完全为您的任务提供开箱即用的必要功能,但将其与NetworkX和PandasUDF结合使用被证明是一种有效的解决方案。首先,让我们来研究一下NetworkX的功能,尤其是与您的示例相关的功能。
示例图的绘制:
的数据
Json的算法是NetworkX中simple_cycles函数的基础,与其他基于DFS修改的算法(source)相比,它具有更好的时间复杂度。
在NetworkX中查找循环的代码:
字符串
NetworkX函数simple_cycles显然提供了所需的功能。然而,考虑到潜在的可扩展性问题和在Spark生态系统中运行的需求,寻求以并行方式运行的解决方案是有益的。这就是PandasUDF(矢量化UDF)实用程序的亮点所在。为了制定一个可扩展和可推广的解决方案,我们的第一步是执行连接组件操作。GraphFrames方便地提供了这种功能,如下所示:
型
从connected components函数获得输出后(通常为[node,component id]格式),您可以使用此component id扩展原始的edge DataFrame。这将导致Spark DataFrame结构化为[src,dst,component]。
为了简洁起见,我将在示例的后续步骤中手动生成这样一个Spark DataFrame。为了说明循环查找函数在不同连接组件上的并行化能力,我还将一个附加子图的边合并到边列表中。
假设这是扩展的边缘列表
型
以下是扩展图的可视化,由两个不同的连接组件组成:
的
这就是使用.show()时扩展的边列表(现在与组件ID集成)的显示方式
型
接下来,我们定义一个Pandas UDF,它可以应用于每组连接的组件。除了查找循环之外,该函数还旨在返回有用的信息,例如找到的循环计数和构成每个组件的每个循环的边列表:
型
现在定义了Pandas UDF,我们继续将此函数应用于每个单独的连接组件,如下所示:
型
结果cycles dataframe看起来像这样:
型
最后,我们可以连接两个DataFrame:
型
结果是
型
注意,我们可以在这里使用
broadcast
,因为循环 Dataframe 中的行数是连接组件的数量,通常比边缘列表中的行数小得多。broadcast函数告诉Spark将较小的DataFrame广播到所有工作节点,如果一个DataFrame比另一个小得多,这可以加快join操作。