Pyspark窗口返回描述列

wbrvyc0a  于 2022-12-11  发布在  Spark
关注(0)|答案(1)|浏览(130)

我有一个包含2列的 Dataframe :process nameprocess rank。我想使用窗口功能向 Dataframe 中再添加2列,以查找最小和最大秩,并在每行上显示它们。
请参阅示例列 '最大等级过程(输出我想要使用窗口)''最小等级过程(输出我想要使用窗口2)' 了解我实际想要输出的内容。似乎窗口可能不支持没有某种聚合的'列名'。如果不使用窗口(或使用窗口),我如何完成此操作?

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

schema = StructType([ \
    StructField("Process",StringType(),True), \
    StructField("Process_rank",IntegerType(),True), \
    StructField("Max Rank Process (output I want using windowing)",StringType(),True) , \
    StructField("Min Rank Process (output I want using windowing 2)",StringType(),True)
])

data = [("Inventory", 1, "Retire","Inventory"), \
       ("Data availability", 2, "Retire", "Inventory"), \
       ("Code Conversion", 3, "Retire", "Inventory"), \
       ("Retire", 4, "Retire", "Inventory")
       ]

df = spark.createDataFrame(data=data,schema=schema)

############Partitions
# window1: partition by Process name, order by rank max
w_max_rnk = Window.partitionBy("Process").orderBy(F.col("Process_rank").desc()) 
# window2: partition by Process name, order by rank min
w_max_rnk = Window.partitionBy("Process").orderBy(F.col("Process_rank").asc()) 

#windowed cols to find max and min processes from dataframe
df = df.withColumn("max_ranked_process", F.col("Process").over(w_max_rnk)) \
.withColumn("min_ranked_process", F.col("Process").over(w_max_rnk))
kzmpq1sx

kzmpq1sx1#

性能不是很好,它只适用于较小的 Dataframe ,尽管这应该会给予正确的结果。

w = Window.orderBy('Process_rank').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = (df.withColumn('Max Rank Process', F.last('Process').over(w))
      .withColumn('Min Rank Process', F.first('Process').over(w)))

相关问题