识别与pysparkDataframe中的当前值不同的最新记录

ubby3x7f  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(326)

我有一个pysparkDataframe,其中每个用户在某个时间点都有特定的状态,如下面的虚拟数据

--------------------------
    |user_id| status| month  |
    --------------------------
    | 1     | A     | 12/2020|
    | 1     | A     | 11/2020|
    | 1     | B     | 10/2020|
    | 1     | B     | 09/2020|
    | 1     | A     | 08/2020|
    | 1     | C     | 07/2020|
    | 2     | A     | 12/2020|
    | 2     | A     | 11/2020|
    | 2     | A     | 10/2020|
    | 2     | B     | 09/2020|

我想在pyspark数据框中创建另外两个列(previous\u status\u value和previous\u status\u month),对于每个记录,这两个列表示用户与记录中的状态不同的最近日期,以及该值是什么。使用上述虚拟数据,结果将是

------------------------------------------------------------------------
    |user_id| status| month  | previous_status_value| previous_status_month|
    ------------------------------------------------------------------------
    | 1     | A     | 12/2020| B                    | 10/2020              |
    | 1     | A     | 11/2020| B                    | 10/2020              |
    | 1     | B     | 10/2020| A                    | 08/2020              |
    | 1     | B     | 09/2020| A                    | 08/2020              |
    | 1     | A     | 08/2020| C                    | 07/2020              |
    | 1     | C     | 07/2020| Null                 | Null                 |
    | 2     | A     | 12/2020| B                    | 09/2020              |
    | 2     | A     | 11/2020| B                    | 09/2020              |
    | 2     | A     | 10/2020| B                    | 09/2020              |
    | 2     | B     | 09/2020| Null                 | Null                 |

dataframe有数百万条记录,所以我试图使用窗口函数(类似于这个答案)来实现这一点,但没有实现。

uurity8g

uurity8g1#

使用查找状态更改的位置 lead ,只保留 status 以及 month 与状态更改相对应,并用null屏蔽,否则使用 when(F.col('begin'), F.col('status')) ,并使用 F.last(..., ignorenulls=True) .

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window.partitionBy('user_id').orderBy('month')
begin = F.lead('status').over(w) != F.col('status')
df = df.select('*', begin.alias('begin'))

w = w.rowsBetween(Window.unboundedPreceding, -1)
previous_status_value = F.last(F.when(F.col('begin'), F.col('status')), ignorenulls=True).over(w).alias('previous_status_value')
previous_status_month = F.last(F.when(F.col('begin'), F.col('month')), ignorenulls=True).over(w).alias('previous_status_month ')

df = df.select('*', previous_status_value, previous_status_month).drop('begin').orderBy('user_id', F.col('month').desc())

df.show()
+-------+------+-------+---------------------+----------------------+
|user_id|status|  month|previous_status_value|previous_status_month |
+-------+------+-------+---------------------+----------------------+
|      1|     A|12/2020|                    B|               10/2020|
|      1|     A|11/2020|                    B|               10/2020|
|      1|     B|10/2020|                    A|               08/2020|
|      1|     B|09/2020|                    A|               08/2020|
|      1|     A|08/2020|                    C|               07/2020|
|      1|     C|07/2020|                 null|                  null|
|      2|     A|12/2020|                    B|               09/2020|
|      2|     A|11/2020|                    B|               09/2020|
|      2|     A|10/2020|                    B|               09/2020|
|      2|     B|09/2020|                 null|                  null|
+-------+------+-------+---------------------+----------------------+

相关问题