在pySpark中计算非唯一列表元素的累积和

p8ekf7hl  于 2023-04-21  发布在  Apache
关注(0)|答案(1)|浏览(94)

我有一个PySpark数据框架,其中包含一个包含列表的列。列表项可能会跨行重叠。我需要通过'orderCol'列排序的行的唯一列表元素的累积总和。在我的应用程序中,每个列表中可能有数百万行和数百个项。我可以'我似乎不知道如何在PySpark中做到这一点,以便它可以扩展,并且会感谢任何关于如何解决它的大大小小的想法。
我已经发布了输入和期望的输出,以给予我正在努力实现的目标。

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
        {"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
        {"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
        {"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4},
        ]

df = spark.createDataFrame(data)
df.show()

data_out = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1, "cumulative_item_count": 4},
        {"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2, "cumulative_item_count": 7},
        {"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3, "cumulative_item_count": 9},
        {"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4, "cumulative_item_count": 10},
        ]

df_out = spark.createDataFrame(data_out)
df_out.show()
ru9i0ody

ru9i0ody1#

尝试使用窗口函数使用**unboundedPreceedingcurrentRow**。
然后flatten嵌套数组。
最后,我们将array_distinct + size函数来计算数组中的不同元素。

Example:

from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
        {"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
        {"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
        {"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4},
        ]

w=Window.partitionBy(lit(1)).orderBy("orderCol").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = spark.createDataFrame(data).\
  withColumn("temp_col",collect_list(col("items")).over(w)).\
  withColumn("cumulative_item_count",size(array_distinct(flatten(col("temp_col")))))
df.show(20,False)

#+------------+----+--------+--------------------------------------------------------+---------------------+
#|items       |node|orderCol|temp_col                                                |cumulative_item_count|
#+------------+----+--------+--------------------------------------------------------+---------------------+
#|[a, b, c, d]|r1  |1       |[[a, b, c, d]]                                          |4                    |
#|[e, f, g, a]|r2  |2       |[[a, b, c, d], [e, f, g, a]]                            |7                    |
#|[h, i, g, b]|r3  |3       |[[a, b, c, d], [e, f, g, a], [h, i, g, b]]              |9                    |
#|[j, i, f, c]|r4  |4       |[[a, b, c, d], [e, f, g, a], [h, i, g, b], [j, i, f, c]]|10                   |

相关问题