如何计算pyspark中行之间的差异?

sqyvllje  于 2021-05-16  发布在  Spark
关注(0)|答案(2)|浏览(564)

这是我在Pypark中的Dataframe:

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   5       A
2015-10-13 13:00:00+00:00   6       A
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   11      B

价值观 data 是累积的。
我想得到这个结果(连续行之间的差异,按 feed ):

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   4       A
2015-10-13 13:00:00+00:00   1       A  
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   1       B

pandas 我会这样做:

df["data"] -= (df.groupby("feed")["data"].shift(fill_value=0))

我怎么能在Pypark里做同样的事?

ia2d9nvy

ia2d9nvy1#

您可以使用带有窗口的滞后函数来执行此操作:

from pyspark.sql.window import Window
import pyspark.sql.functions as f

w = Window.partitionBy("feed").orderBy("utc_timestamp")

df = df.withColumn("data", f.col("data") - f.lag(f.col("data"), 1, 0).over(window))
wtlkbnrh

wtlkbnrh2#

你可以用 lag 代替 shift ,和 coalesce( , F.lit(0)) 代替 fill_value=0 ```
from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("feed").orderBy("utc_timestamp")

data = F.col("data") - F.coalesce(F.lag(F.col("data")).over(window), F.lit(0))
df.withColumn("data", data)

相关问题