如何基于连续行中的值聚合pyspark

ijxebb2r  于 2023-05-16  发布在  Spark
关注(0)|答案(1)|浏览(179)

我有输入dataframe其中有3列时间,名称,标志。我想聚合成一个开始和结束列,其中NameFlag具有相同的值。
输入 Dataframe
| 时间|姓名|旗|
| --------------|--------------|--------------|
| 2023年5月1日1:01|彼得|1|
| 2023年5月1日1:02|彼得|1|
| 2023年5月1日1:03|彼得|1|
| 2023年5月1日1:04|彼得|0|
| 2023年5月1日1:05|彼得|0|
| 2023年5月1日1:06|彼得|1|
| 2023年5月1日1:07|彼得|1|
| 2023年5月1日1:08|彼得|1|
| 2023年5月1日1:01|约翰|1|
| 2023年5月1日1:02|约翰|0|
| 2023年5月1日1:03|约翰|0|
| 2023年5月1日1:04|约翰|0|
| 2023年5月1日1:05|约翰|0|
| 2023年5月1日1:06|约翰|0|
| 2023年5月1日1:07|约翰|1|
| 2023年5月1日1:08|约翰|1|
| 2023年5月2日1:10|彼得|1|
| 2023年5月2日1:11|彼得|1|
| 2023年5月2日1:20|约翰|0|
| 2023年5月2日1:21|约翰|0|
| 2023年5月2日1:22|约翰|0|
输出 Dataframe
| 开始|结束|姓名|旗|
| --------------|--------------|--------------|--------------|
| 2023年5月1日1:01| 2023年5月1日1:03|彼得|1|
| 2023年5月1日1:04| 2023年5月1日1:05|彼得|0|
| 2023年5月1日1:06| 2023年5月1日1:08|彼得|1|
| 2023年5月2日1:10| 2023年5月2日1:11|彼得|1|
| 2023年5月1日1:01| 2023年5月1日1:01|约翰|1|
| 2023年5月1日1:02| 2023年5月1日1:06|约翰|0|
| 2023年5月1日1:07| 2023年5月1日1:08|约翰|1|
| 2023年5月2日1:20| 2023年5月2日1:22|约翰|0|
在这种情况下,连续的行意味着在时间上连续。
1:08和1:10未合并,因为行1:08和1:10之间存在间隙(缺少1:09
你能告诉我怎么做吗?

bbmckpt7

bbmckpt71#

首先,您要创建符合条件的分组。要创建它,一般的提示是创建一个标志,当你想要分离一个组时,标志为1,当你想要合并到前一个组时,标志为0。然后,cumsum在这个标志将导致你想要的分组。
你的条件是

from pyspark.sql import functions as F
# covert Time to timestamp
df = df.withColumn('timestamp', F.to_timestamp('Time', 'M/d/yyyy H:mm'))

w = Window.partitionBy('Name').orderBy('timestamp')

# If previous Flag is different from current Flag
F.lag('Flag').over(w) != F.col('Flag'))

# OR previous timestamp is more than 1 minute ago
| (((F.col('timestamp').cast('long') - F.lag('timestamp').over(w).cast('long')) / 60) > 1)

在这些条件下,将分组创建为grp列,并使用该列进行聚合。

w = Window.partitionBy('Name').orderBy('timestamp')
df = (df.withColumn('timestamp', F.to_timestamp('Time', 'M/d/yyyy H:mm'))
      .withColumn('grp', (F.lag('Flag').over(w).isNull() 
                          | (F.lag('Flag').over(w) != F.col('Flag'))
                          | (((F.col('timestamp').cast('long') - F.lag('timestamp').over(w).cast('long')) / 60) > 1)).cast('int'))
      .withColumn('grp', F.sum('grp').over(w))
      .groupby('Name', 'grp')
      .agg(F.min('Time').alias('Start'), F.max('Time').alias('End'), F.first('Flag').alias('Flag')))

相关问题