我有一个我用的Spark collect_list
以及 PartitionBy
提取与一组列关联的值的列表。因此,对于分组列,我现在有了一个新列,其中包含与group关联的元素列表。但是,我希望这个列表被进一步细分,以便它包含嵌套列表。同样重要的是,这些列的顺序是按日期排序的。见下表:
data = [
["ABC", 1, 3, "2020-04-01", "product_one"],
["ABC", 1, 3, "2020-04-01", "product_two"],
["ABC", 1, 3, "2020-04-12", "product_one"],
["ABC", 1, 3, "2020-04-12", "product_two"],
]
df = pd.DataFrame(data, columns=["ID", "Ref_No", "Number", "Date", "Product"])
sdf = spark.createDataFrame(df)
w = Window.partitionBy("ID", "Ref_No", "Number").orderBy("Date")
grouped_sdf = (
sdf.withColumn(
"Products",
spark_fns.collect_list("Product").over(w),
)
.withColumn(
"Dates",
spark_fns.collect_set("Date").over(w),
)
.groupby("ID", "Ref_No", "Number")
.agg(
spark_fns.max("Products").alias("Products"),
spark_fns.max("Dates").alias("Dates"),
)
)
ID Ref_No Number Products Dates
ABC 1 3 [product_one, [2020-04-01,
product_two, 2020-04-12]
product_one,
product_two]
我想要这一栏的清单 Products
实际上也包含与每个计时相关联的列表。所以期望的输出是:
所以我们知道第一个列表(在列表中)与第一个日期相关联,然后列表中的第二个列表与第二个日期相关联。
ID Ref_No Number Products Dates
ABC 1 3 [[product_one, [2020-04-01,
product_two], 2020-04-12]
[product_one,
product_two]]
1条答案
按热度按时间0yycz8jy1#
您可以执行两个分组方式和收集列表: