我有一个像下面这样的框架
df = spark.createDataFrame(
[(1,1,10), (2,1,10), (3,1,None),(4,1,10),(5,1,10),(6,1,20) \
,(7,1,20), (1,2,10),(2,2,10),(3,2,10),(4,2,20),(5,2,20)],
["Month","customer","amount"])
windowPartition = Window.partitionBy("customer").orderBy("Month").rangeBetween(Window.currentRow-5,Window.currentRow )
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
display(df.orderBy("customer","Month"))
字符串
的数据
我想执行平均超过6个月的数据只有当之间没有空值。我能够实现以下结果使用窗口函数,其中空值被忽略。对于客户1,即使有一个空值平均计算忽略空值。对于客户2,只有5个月的数据,仍然试图计算平均。
的
由于我只想在有6个连续变量没有空值时计算平均值,所以我创建了一个count变量,只在count大于或等于6时计算平均值,结果是
df2 = df.groupBy("customer").agg({"amount":"count"}).withColumnRenamed("count(amount)", "Amount_count" )
df= df.join(broadcast(df2), on='customer', how='left')
windowPartition = Window.partitionBy("customer").orderBy("month").rangeBetween(Window.currentRow-5,Window.currentRow )
df=df.withColumn("avg_6_month",avg('amount').over(windowPartition))
df=df.withColumn("avg_6_month",when(df.Amount_count >=6, avg('amount').over(windowPartition)).otherwise(None))
columns=['month', 'customer','amount','avg_6_month']
display(df.select(*columns).orderBy("customer","month"))
型
的
所以现在客户2的平均值没有计算,这是我想要的。但是对于客户1,我仍然不想计算平均值,因为没有连续6个月的数据在金额列上没有空值。
我是pyspark的新手,我知道如何在R中实现这一点。
> amount <- c(10,10,NA,10,10,20,20,10)
> roll_mean(amount, n = 3, align ="right", fill = NA)
[1] NA NA NA NA NA 13.33333 16.66667 16.66667
型
我在Pyspark中期待下面的结果。
我的实际数据在不同的月份有许多空值,对于许多客户。所以我想只计算连续6个月没有空值的平均值。这是否可以使用窗口函数或有其他方法来实现这个结果?
的
2条答案
按热度按时间whlutmcx1#
绝对有可能!在您的尝试中,您的amount_count是按客户计算的,但您希望每六个月执行此操作。类似于以下操作应该可以工作:
字符串
结果
x1c 0d1x的数据
对于下一个 Dataframe ,相同的代码给出
型
的
x9ybnkn62#
我设法解决了它自己,但它是一个漫长的一个不是一个很好的编码实践的生产代码。如果有人有一个更好的方法来解决这个问题,那么我会欣赏他们的想法,考虑作为一个答案,这个问题。
字符串
结果
的数据
这也适用于具有12个月和超过1个NULL值的 Dataframe
型
的
start=1,calculate_between = 2,结果为
的