在Spark中合并关联项目

ippsafx7  于 2022-12-19  发布在  Apache
关注(0)|答案(3)|浏览(129)

在Spark中,我有一个很大的元素列表(数百万),其中包含彼此关联的项。
1: ("A", "C", "D") #此数组中的每个项都与数组中的任何其他元素相关联,因此A和C相关联,A和D相关联,C和D相关联。
2: ("F", "H", "I", "P")
3: ("H", "I", "D")
4: ("X", "Y", "Z")
我想执行一个操作,在存在跨列表关联的情况下合并关联。在上面的示例中,我们可以看到前三行的所有项目都相互关联(第1行和第2行应该组合,因为根据第3行,D和I是关联的)。因此,输出应为:
("A", "C", "D", "F", "H", "I", "P")
("X", "Y", "Z")
在Spark中我可以使用什么类型的转换来执行这个操作?我看了各种各样的数据分组方法,但是还没有找到一种明显的方法来合并共享公共元素的列表元素。
谢谢大家!

bprjcwpo

bprjcwpo1#

正如一些用户已经说过的,这可以看作是一个图问题,即您希望找到图中的连通分支。
由于你正在使用spark,我认为这是一个很好的机会来展示如何在python中使用graphx,为了运行这个例子,你需要pyspark和graphframes python包。

from pyspark.sql import SparkSession
from graphframes import  GraphFrame
from pyspark.sql import functions as f

spark = (
    SparkSession.builder.appName("test")
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12")
    .getOrCreate()
)
# graphframe requires defining a checkpoint dir.
spark.sparkContext.setCheckpointDir("/tmp/checkpoint")
# lets create a sample dataframe
df = spark.createDataFrame(
    [
        (1, ["A", "C", "D"]),
        (2, ["F", "H", "I", "P"]),
        (3, ["H", "I", "D"]),
        (4, ["X", "Y", "Z"]),
    ],
    ["id", "values"],
)

# We can use the explode function to explode the lists in new rows having a list of (id, node)
df = df.withColumn("node", f.explode("values"))
df.createOrReplaceTempView("temp_table")
# Then we can join the table with itself to generate an edge table with source and destination nodes.
edge_table = spark.sql(
    """
SELECT
distinct a.node as src, b.node as dst 
FROM
temp_table a join temp_table b 
ON a.id=b.id AND a.node != b.node
"""
)

# Now we define our graph by using an edge table (a table with the node ids)
# and our edge table
# then we use the connectedComponents method to find the components
cc_df = GraphFrame(
    df.selectExpr("node as id").drop_duplicates(), edge_table
).connectedComponents()

# The cc_df dataframe will have two columns, the node id and the connected component.
# To get the desired result we can group by the component and create a list
cc_df.groupBy("component").agg(f.collect_list("id")).show(truncate=False)

您将得到的输出如下所示:

可以使用以下命令安装依赖项:

pip install -q pyspark==3.2 graphframes
bf1o4zei

bf1o4zei2#

问题中可能没有足够的信息来完全解决这个问题,但我建议使用GraphX创建一个邻接矩阵/列表来将其表示为一个图,希望从那里你可以解决剩下的问题。
https://en.wikipedia.org/wiki/Adjacency_matrix
https://spark.apache.org/docs/latest/graphx-programming-guide.html

wdebmtf2

wdebmtf23#

如果您使用的是PySpark内核,那么这个解决方案应该可以工作

iset = set([frozenset(s) for s in tuple_list])  # Convert to a set of sets
result = []
while(iset):                  # While there are sets left to process:
    nset = set(iset.pop())      # Pop a new set
    check = len(iset)           # Does iset contain more sets
    while check:                # Until no more sets to check:
        check = False
        for s in iset.copy():       # For each other set:
            if nset.intersection(s):  # if they intersect:
                check = True            # Must recheck previous sets
                iset.remove(s)          # Remove it from remaining sets
                nset.update(s)          # Add it to the current set
    result.append(tuple(nset))  # Convert back to a list of tuples

相关问题