如何在PySpark collect_list中维护排序顺序并收集多个列表

fnvucqvd  于 2023-08-02  发布在  Spark
关注(0)|答案(1)|浏览(129)

我想维护日期排序顺序,对多个列使用collect_list,所有列都具有相同的日期顺序。我需要他们在同一个dataframe,这样我就可以利用创建一个时间序列模型输入。下面是“train_data”的示例:


的数据
我使用了一个带有PartitionBy的Window,通过为每个Syscode_Stn调优_evnt_start_dt来确保排序顺序。我可以用下面的代码创建一个列:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
           )\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))

字符串
但我如何在同一个新 Dataframe 中创建两个列呢?

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))



请注意,MarchMadInd未显示在屏幕截图中,但包含在train_data中。解释一下我是如何走到今天这一步的:https://stackoverflow.com/a/49255498/8691976

w46czmvw

w46czmvw1#

是的,正确的方法是添加连续的.withColumn语句,然后是一个.agg语句,用于删除每个数组的重复项。

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

    sorted_list_df = train_data.withColumn('spp_imp_daily', 
    F.collect_list('spp_imp_daily').over(w)
                                      )\
    .withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\

    .groupBy('Syscode_Stn')\
    .agg(F.max('spp_imp_daily').alias('spp_imp_daily'), 
     F.max('MarchMadInd').alias('MarchMadInd')
    )

字符串

相关问题