如何在pyspark中的流查询中生成会话窗口?

kq0g1dla  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(130)

下面的代码在批处理中可以正常工作,但是在使用臭名昭著的AnalysisException的流查询中失败了:
流式 Dataframe /数据集不支持非基于时间的窗口

from pyspark.sql.functions import *
from pyspark.sql.window import Window

temp = [
  ('Alice', 1),
  ('Alice', 60),
  ('Alice', 160),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1112),
  ('Bob', 3),
  ('Alice', 2),
  ('Bob', 2),
  ('Alice', 3),
  ('Bob', 1)
]

temp_df = spark.createDataFrame(temp, ["user", "ev_timestamp"])

maxSessionDuration = 60 * 10 # Max session duration of 10 minutes.
client_fp_time_window = Window.partitionBy("user").orderBy("ev_timestamp")

rowsWithSessionIds = temp_df \
    .select("user", "ev_timestamp", lag("ev_timestamp", 1).over(client_fp_time_window).alias("prevEvTimestamp")) \
    .select("user", "ev_timestamp", 
            when(
              (col("ev_timestamp").cast('long') - col("prevEvTimestamp").cast('long')) < maxSessionDuration, 0) \
            .otherwise(1).alias("isNewSession")
    ) \
    .select("user", "ev_timestamp", sum("isNewSession").over(client_fp_time_window).alias("sessionId"))

display(rowsWithSessionIds)

sessionsDF = rowsWithSessionIds \
  .groupBy("user", "sessionId") \
  .agg(min("ev_timestamp").alias("startTime"), max("ev_timestamp").alias("endTime"), count("*").alias("count")) \
  .alias('Session')

display(sessionsDF)

字符串
我理解这是因为lag()函数不支持流查询。所以推荐的替代方案是使用mapGroupsWithState()方法,但这仅限于Scala/Java。
如何在Pyspark中实现这一点?或者Pyspark的结构化会话有哪些其他替代方案?
每个批处理所需的输出如下所示:

user    sessionId   startTime   endTime count
Bob     1           1           3       3
Alice   1           1           160     5
Alice   2           1111        1112    7

btxsgosb

btxsgosb1#

由于spark >= 3.2.0F.session_window是可用的,它可以用于流和批处理。输出与您的略有不同,而不是sessionId,我们得到session_window

from pyspark.sql import functions as F

temp = [
  ('Alice', 1),
  ('Alice', 60),
  ('Alice', 160),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1111),
  ('Alice', 1112),
  ('Bob', 3),
  ('Alice', 2),
  ('Bob', 2),
  ('Alice', 3),
  ('Bob', 1)
]

temp_df = spark.createDataFrame(temp, ['user', 'ev_timestamp_sec'])
temp_df = temp_df.withColumn(
    'ev_timestamp', F.timestamp_seconds('ev_timestamp_sec')
)
# For Structured Streaming, we have to set a watermark
temp_df = temp_df.withWatermark('ev_timestamp', '5 minutes')

sess_window = F.session_window(
    timeColumn='ev_timestamp', gapDuration='10 minutes'
)
agg_cols = [
    F.min('ev_timestamp').alias('startTime'),
    F.max('ev_timestamp').alias('endTime'),
    F.count('*').alias('count')
]
sessions_df = temp_df.groupBy('user', sess_window).agg(*agg_cols)
sessions_df.show(3, False)

# +-----+------------------------------------------+-------------------+-------------------+-----+
# |user |session_window                            |startTime          |endTime            |count|
# +-----+------------------------------------------+-------------------+-------------------+-----+
# |Alice|{1970-01-01 00:00:01, 1970-01-01 00:12:40}|1970-01-01 00:00:01|1970-01-01 00:02:40|5    |
# |Alice|{1970-01-01 00:18:31, 1970-01-01 00:28:32}|1970-01-01 00:18:31|1970-01-01 00:18:32|7    |
# |Bob  |{1970-01-01 00:00:01, 1970-01-01 00:10:03}|1970-01-01 00:00:01|1970-01-01 00:00:03|3    |
# +-----+------------------------------------------+-------------------+-------------------+-----+

字符串

相关问题