从PySpark连续行中获取第一个值

o3imoua4  于 2022-11-21  发布在  Spark
关注(0)|答案(2)|浏览(173)

我正在使用PySpark,我想按日期获得第一个状态,但只有在有连续状态的情况下,因为状态可以不止一次,但在一行中不能超过一个。下面是我的例子:
| 状态|创建时间|GP系列|
| - -|- -|- -|
| A级|2022年10月10日|A1级|
| B| 2022年10月12日|A1级|
| B| 2022年10月13日|A1级|
| C语言|2022年10月13日|A1级|
| C语言|2022年10月14日|A1级|
| B| 2022年12月15日|A1级|
| C语言|2022年12月16日|A1级|
| D级|2022年12月17日|A1级|
| A级|2022年12月18日|A1级|
这就是我所需要的:
| 状态|创建时间|GP系列|
| - -|- -|- -|
| A级|2022年10月10日|A1级|
| B| 2022年10月12日|A1级|
| C语言|2022年10月13日|A1级|
| B| 2022年12月15日|A1级|
| C语言|2022年12月16日|A1级|
| D级|2022年12月17日|A1级|
| A级|2022年12月18日|A1级|
我想的东西,但不知道如何实现:

when(row[status] == row[status] + 1) then row[status]

谢谢你的帮助

gfttwv5a

gfttwv5a1#

您可以使用按日期排序的窗口函数(这就是 Dataframe 顺序的重要性),并比较下一行中的状态是否与之前相同:您可以使用lag函数来完成此操作。

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window.orderBy('date')

df = (df
 .withColumn('status_lag', F.lag('status').over(w))
 .filter((F.col('status_lag') != F.col('status')) | (F.col('status_lag').isNull()))
 .drop('status_lag')
)

+------+----------+
|status|      date|
+------+----------+
|     A|2022-10-10|
|     B|2022-10-12|
|     C|2022-10-13|
|     B|2022-12-15|
|     C|2022-12-16|
|     D|2022-12-17|
|     A|2022-12-18|
+------+----------+
bvpmtnay

bvpmtnay2#

我刚刚找到了问题的答案。我意识到我必须添加另一列来执行 partitionBy

w  = Window.partitionBy('GP').orderBy("create_when")
df_1= df_0.withColumn("lag",F.lag("status").over(w))\
           .where((F.col("status") != F.col("lag")) | (F.col("lag").isNull()))

相关问题