下面的代码在批处理中可以正常工作,但是在使用臭名昭著的AnalysisException
的流查询中失败了:
流式 Dataframe /数据集不支持非基于时间的窗口
from pyspark.sql.functions import *
from pyspark.sql.window import Window
temp = [
('Alice', 1),
('Alice', 60),
('Alice', 160),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1111),
('Alice', 1112),
('Bob', 3),
('Alice', 2),
('Bob', 2),
('Alice', 3),
('Bob', 1)
]
temp_df = spark.createDataFrame(temp, ["user", "ev_timestamp"])
maxSessionDuration = 60 * 10 # Max session duration of 10 minutes.
client_fp_time_window = Window.partitionBy("user").orderBy("ev_timestamp")
rowsWithSessionIds = temp_df \
.select("user", "ev_timestamp", lag("ev_timestamp", 1).over(client_fp_time_window).alias("prevEvTimestamp")) \
.select("user", "ev_timestamp",
when(
(col("ev_timestamp").cast('long') - col("prevEvTimestamp").cast('long')) < maxSessionDuration, 0) \
.otherwise(1).alias("isNewSession")
) \
.select("user", "ev_timestamp", sum("isNewSession").over(client_fp_time_window).alias("sessionId"))
display(rowsWithSessionIds)
sessionsDF = rowsWithSessionIds \
.groupBy("user", "sessionId") \
.agg(min("ev_timestamp").alias("startTime"), max("ev_timestamp").alias("endTime"), count("*").alias("count")) \
.alias('Session')
display(sessionsDF)
字符串
我理解这是因为lag()
函数不支持流查询。所以推荐的替代方案是使用mapGroupsWithState()
方法,但这仅限于Scala/Java。
如何在Pyspark中实现这一点?或者Pyspark的结构化会话有哪些其他替代方案?
每个批处理所需的输出如下所示:
user sessionId startTime endTime count
Bob 1 1 3 3
Alice 1 1 160 5
Alice 2 1111 1112 7
型
1条答案
按热度按时间btxsgosb1#
由于
spark >= 3.2.0
,F.session_window
是可用的,它可以用于流和批处理。输出与您的略有不同,而不是sessionId
,我们得到session_window
。字符串