pyspark 如何使用窗口函数执行几个月的平均值,中间有空值?

rwqw0loc  于 11个月前  发布在  Spark
关注(0)|答案(2)|浏览(132)

我有一个像下面这样的框架

df = spark.createDataFrame(
  [(1,1,10), (2,1,10), (3,1,None),(4,1,10),(5,1,10),(6,1,20)  \
   ,(7,1,20), (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,20)],
  ["Month","customer","amount"])

windowPartition = Window.partitionBy("customer").orderBy("Month").rangeBetween(Window.currentRow-5,Window.currentRow )
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
display(df.orderBy("customer","Month"))

字符串


的数据
我想执行平均超过6个月的数据只有当之间没有空值。我能够实现以下结果使用窗口函数,其中空值被忽略。对于客户1,即使有一个空值平均计算忽略空值。对于客户2,只有5个月的数据,仍然试图计算平均。



由于我只想在有6个连续变量没有空值时计算平均值,所以我创建了一个count变量,只在count大于或等于6时计算平均值,结果是

df2 = df.groupBy("customer").agg({"amount":"count"}).withColumnRenamed("count(amount)", "Amount_count" )
df= df.join(broadcast(df2), on='customer', how='left')

windowPartition = Window.partitionBy("customer").orderBy("month").rangeBetween(Window.currentRow-5,Window.currentRow )
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
df=df.withColumn("avg_6_month",when(df.Amount_count >=6, avg('amount').over(windowPartition)).otherwise(None))
columns=['month', 'customer','amount','avg_6_month']
display(df.select(*columns).orderBy("customer","month"))



所以现在客户2的平均值没有计算,这是我想要的。但是对于客户1,我仍然不想计算平均值,因为没有连续6个月的数据在金额列上没有空值。
我是pyspark的新手,我知道如何在R中实现这一点。

> amount <- c(10,10,NA,10,10,20,20,10)
> roll_mean(amount,  n = 3, align ="right", fill = NA)
[1]       NA       NA       NA       NA       NA 13.33333 16.66667 16.66667

我在Pyspark中期待下面的结果。

我的实际数据在不同的月份有许多空值,对于许多客户。所以我想只计算连续6个月没有空值的平均值。这是否可以使用窗口函数或有其他方法来实现这个结果?


whlutmcx

whlutmcx1#

绝对有可能!在您的尝试中,您的amount_count是按客户计算的,但您希望每六个月执行此操作。类似于以下操作应该可以工作:

start = 5
calculate_between = 6

df = spark.createDataFrame([(1,1,10), (2,1,10), (3,1,None),(4,1,10),(5,1,10),(6,1,20),(7,1,20), (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,20)], \
                           ["month","customer","amount"])
windowPartition = Window.partitionBy("customer").orderBy("month").rangeBetween(Window.currentRow-start,Window.currentRow )
df=df.withColumn("six_month_has_null", max(col("amount").isNull()).over(windowPartition)
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
df=df.withColumn("avg_6_month",when(df.six_month_has_null, None).otherwise(col("avg_6_month"))
df=df.withColumn("window_has_six_points", sum(lit(1)).over(windowPartition) == calculate_between)
df=df.withColumn("avg_6_month1",when(df.six_month_has_null | ~df.window_has_six_points, None).otherwise(col("avg_6_month")))
display(df)

字符串

结果

x1c 0d1x的数据
对于下一个 Dataframe ,相同的代码给出

df = spark.createDataFrame(
  [(1,1,1), (2,1,2), (3,1,None),(4,1,4),(5,1,5),(6,1,6)  \
   ,(7,1,7),(8,1,8),(9,1,9),(10,1,None),(11,1,11),(12,1,12), 
    (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,17)],
  ["month","customer","amount"])


x9ybnkn6

x9ybnkn62#

我设法解决了它自己,但它是一个漫长的一个不是一个很好的编码实践的生产代码。如果有人有一个更好的方法来解决这个问题,那么我会欣赏他们的想法,考虑作为一个答案,这个问题。

df = spark.createDataFrame([(1,1,1), (2,1,2), (3,1,None),(4,1,4),(5,1,5),(6,1,6),(7,1,7), (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,17)], \
                           ["month","customer","amount"])
start             = 5
calculate_between = 6

# Create a count variable to find number of non Null values
df2 = df.groupBy("customer").agg({"amount":"count"}).withColumnRenamed("count(amount)", "Amount_count" )
df= df.join(df2, on='customer', how='left')
windowPartition = Window.partitionBy("customer").orderBy("month").rangeBetween(Window.currentRow-start,Window.currentRow )

# Since I am interested in 6 month average, first 5 months should have NULL 
# thus I am forcing the first five months average as Null and only perform an 
# average only when the count column has more than 6 months of non NULL values.
# Then replacing NULL value with a dummy value 9876543210 and performing a
# sum over 6 months. Any sum over 6 rows with (NULL replaced as) 9876543210 can only 
# have two possibilities either 9876543210 (if the entries  are like [0,9876543210,0,0,0,0,0]) 
# or value greater than 9876543210. If satisfied then the average should be Null 
# otherwise I calculate average over 6 months (which means there is no Null over 
# 6 consecutive months

 

df=df.withColumn("avg_6_month",when((df.Amount_count >=calculate_between) & (df.month>start), avg('amount').over(windowPartition)).otherwise(None))
df=df.na.fill(value=9876543210, subset=['amount'])
df=df.withColumn("sum_6_month2",when((df.Amount_count >=calculate_between) & (df.month>start), sum('amount').over(windowPartition)).otherwise(None))
df=df.withColumn("avg_6_month_final",when(df.sum_6_month2 >=9876543210, None).otherwise(df.avg_6_month))
df=df.withColumn('amount', when(df.amount == 9876543210, None).otherwise(df.amount))

columns=['customer','month', 'amount','Amount_count','avg_6_month_final']
display(df.select(*columns).orderBy("customer","month"))

字符串

结果


的数据
这也适用于具有12个月和超过1个NULL值的 Dataframe

df = spark.createDataFrame(
  [(1,1,1), (2,1,2), (3,1,None),(4,1,4),(5,1,5),(6,1,6)  \
   ,(7,1,7),(8,1,8),(9,1,9),(10,1,None),(11,1,11),(12,1,12), 
    (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,17)],
  ["month","customer","amount"])


start=1calculate_between = 2,结果为


相关问题