这是我在Pypark中的Dataframe:
utc_timestamp data feed
2015-10-13 11:00:00+00:00 1 A
2015-10-13 12:00:00+00:00 5 A
2015-10-13 13:00:00+00:00 6 A
2015-10-13 14:00:00+00:00 10 B
2015-10-13 15:00:00+00:00 11 B
价值观 data
是累积的。
我想得到这个结果(连续行之间的差异,按 feed
):
utc_timestamp data feed
2015-10-13 11:00:00+00:00 1 A
2015-10-13 12:00:00+00:00 4 A
2015-10-13 13:00:00+00:00 1 A
2015-10-13 14:00:00+00:00 10 B
2015-10-13 15:00:00+00:00 1 B
在 pandas
我会这样做:
df["data"] -= (df.groupby("feed")["data"].shift(fill_value=0))
我怎么能在Pypark里做同样的事?
2条答案
按热度按时间ia2d9nvy1#
您可以使用带有窗口的滞后函数来执行此操作:
wtlkbnrh2#
你可以用
lag
代替shift
,和coalesce( , F.lit(0))
代替fill_value=0
```from pyspark.sql.window import Window
import pyspark.sql.functions as F
window = Window.partitionBy("feed").orderBy("utc_timestamp")
data = F.col("data") - F.coalesce(F.lag(F.col("data")).over(window), F.lit(0))
df.withColumn("data", data)