pyspark分组按新列汇总:最旧和最新时间表之间的差异

g52tjvyc  于 2023-02-18  发布在  Spark
关注(0)|答案(1)|浏览(169)

我有pyspark Dataframe ,包含以下列:

  • 会话标识
  • 时间戳
data = [(("ID1", "2021-12-10 10:00:00")), 
        (("ID1", "2021-12-10 10:05:00")),
        (("ID2", "2021-12-10 10:20:00")),
        (("ID2", "2021-12-10 10:24:00")),
        (("ID2", "2021-12-10 10:26:00")),
]

我想对会话进行分组,并添加一个名为duration的新列,它是该会话最早和最新时间戳之间的差值(以秒为单位):

ID1: 300
ID2: 360

如何实现?
谢谢你,

zwghvu4y

zwghvu4y1#

可以使用collect_list这样的聚合函数,然后对列表执行max和min操作,要获得以秒为单位的持续时间,可以将时间值转换为unix_timestamp,然后执行求差。
试试这个:

from pyspark.sql.functions import (
    col,
    array_max,
    collect_list,
    array_min,
    unix_timestamp,
)

data = [
    ("ID1", "2021-12-10 10:00:00"),
    ("ID1", "2021-12-10 10:05:00"),
    ("ID2", "2021-12-10 10:20:00"),
    ("ID2", "2021-12-10 10:24:00"),
    ("ID2", "2021-12-10 10:26:00"),
]
df = spark.createDataFrame(data, ["sessionId", "time"]).select(
    "sessionId", col("time").cast("timestamp")
)

df2 = (
    df.groupBy("sessionId")
    .agg(
        array_max(collect_list("time")).alias("max_time"),
        array_min(collect_list("time")).alias("min_time"),
    )
    .withColumn("duration", unix_timestamp("max_time") - unix_timestamp("min_time"))
)
df2.show()

相关问题