我有一个PySpark数据框架,其中包含一个包含列表的列。列表项可能会跨行重叠。我需要通过'orderCol'列排序的行的唯一列表元素的累积总和。在我的应用程序中,每个列表中可能有数百万行和数百个项。我可以'我似乎不知道如何在PySpark中做到这一点,以便它可以扩展,并且会感谢任何关于如何解决它的大大小小的想法。
我已经发布了输入和期望的输出,以给予我正在努力实现的目标。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
data = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1},
{"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2},
{"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3},
{"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4},
]
df = spark.createDataFrame(data)
df.show()
data_out = [{"node": 'r1', "items": ['a','b','c','d'], "orderCol": 1, "cumulative_item_count": 4},
{"node": 'r2', "items": ['e','f','g','a'], "orderCol": 2, "cumulative_item_count": 7},
{"node": 'r3', "items": ['h','i','g','b'], "orderCol": 3, "cumulative_item_count": 9},
{"node": 'r4', "items": ['j','i','f','c'], "orderCol": 4, "cumulative_item_count": 10},
]
df_out = spark.createDataFrame(data_out)
df_out.show()
1条答案
按热度按时间ru9i0ody1#
尝试使用窗口函数使用**
unboundedPreceeding
到currentRow
**。然后
flatten
嵌套数组。最后,我们将
array_distinct
+size
函数来计算数组中的不同元素。Example: