让我们考虑以下输入数据
| incremental_id | session_start_id | session_end_id | items_bought |
|----------------|------------------|----------------|--------------|
| 1 | a | b | 1 |
| 2 | z | t | 7 |
| 3 | b | c | 0 |
| 4 | c | d | 3 |
哪里:
每行代表一个用户会话
每个会话记录一个开始/结束会话id
我们知道前3行与同一个用户关联,因为'session\u end\u id=session\u start\u id'。第四行与第二个用户相关
我希望能够汇总上述数据,以便获得:
第一个顾客买了4件东西
第二个顾客买了7件东西
如何在pyspark(或者最终在纯sql)中实现这一点?我想避免在pyspark中使用udf,但如果这是唯一的方法,那也没关系。
谢谢你的帮助!
edit:我已经更新了示例dataframe,不能单独使用'incremental\u id'将行排序为连续会话
3条答案
按热度按时间evrscar21#
这是一个Pypark版本
332nm8kg2#
公共表表达式是sql:1999.
使用cte,我们可以使用下面的查询
说明:
在锚查询中,选择所有没有父记录的记录(i、 例如,没有其他记录以当前会话\u start \u id)结束
递归地,从表中将cte的session\u end\u id与session\u start\u id连接起来。
将记录分组并返回结果。
sql小提琴link:http://sqlfiddle.com/#!4/ac98a/4/0型
(注:在小提琴中使用oracle。但是任何支持cte的db引擎都应该工作)。
hgc7kmma3#
如果您能够访问临时表创建和受影响的行计数元数据,则可以移植此: