如何在pyspark(或最终sql)中跨不同列聚合值?

wfsdck30  于 2021-05-16  发布在  Spark
关注(0)|答案(3)|浏览(411)

让我们考虑以下输入数据

| incremental_id | session_start_id | session_end_id | items_bought |
|----------------|------------------|----------------|--------------|
| 1              | a                | b              | 1            |
| 2              | z                | t              | 7            |
| 3              | b                | c              | 0            |
| 4              | c                | d              | 3            |

哪里:
每行代表一个用户会话
每个会话记录一个开始/结束会话id
我们知道前3行与同一个用户关联,因为'session\u end\u id=session\u start\u id'。第四行与第二个用户相关
我希望能够汇总上述数据,以便获得:
第一个顾客买了4件东西
第二个顾客买了7件东西
如何在pyspark(或者最终在纯sql)中实现这一点?我想避免在pyspark中使用udf,但如果这是唯一的方法,那也没关系。
谢谢你的帮助!
edit:我已经更新了示例dataframe,不能单独使用'incremental\u id'将行排序为连续会话

evrscar2

evrscar21#

这是一个Pypark版本

from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *

# create a window over the full data so we can lag the session end id

win = Window().partitionBy().orderBy("incremental_id")

# This is logic to indicate a user change

df = df.withColumn('user_boundary', F.lag(F.col("session_end_id"), 1).over(win) != F.col("session_start_id"))
df = df.withColumn('user_boundary', F.when(F.col("user_boundary").isNull(), F.lit(False)).otherwise(F.col("user_boundary")))

# Now create an artificial user id

df = df.withColumn('user_id', F.sum(F.col("user_boundary").cast(IntegerType())).over(win))

# Aggregate

df.groupby('user_id').agg(F.sum(F.col("items_bought")).alias("total_bought")).show()

+-------+------------+
|user_id|total_bought|
+-------+------------+
|      0|           4|
|      1|           7|
+-------+------------+
332nm8kg

332nm8kg2#

公共表表达式是sql:1999.
使用cte,我们可以使用下面的查询

WITH cte(session_start_id, session_end_id, items_bought) AS (
  select session_start_id, session_end_id, items_bought from user_session where session_start_id not in (
    select session_end_id from user_session)
UNION ALL
select a.session_start_id, b.session_end_id, b.items_bought from cte a 
  inner join user_session b on a.session_end_id = b.session_start_id)
  select session_start_id, sum(items_bought) from cte group by (session_start_id)

说明:
在锚查询中,选择所有没有父记录的记录(i、 例如,没有其他记录以当前会话\u start \u id)结束
递归地,从表中将cte的session\u end\u id与session\u start\u id连接起来。
将记录分组并返回结果。
sql小提琴link:http://sqlfiddle.com/#!4/ac98a/4/0型
(注:在小提琴中使用oracle。但是任何支持cte的db引擎都应该工作)。

hgc7kmma

hgc7kmma3#

如果您能够访问临时表创建和受影响的行计数元数据,则可以移植此:

insert into #CTESubs
select
    session_start_id,
    session_end_id,
    items_bought
from #user_session
WHERE
    session_start_id not in (select session_end_id from #user_session)

while(@@ROWCOUNT <> 0)
begin
    insert into #CTESubs
    select distinct
        p.session_start_id,
        c.session_end_id,
        c.items_bought
    from #user_session c
        inner join #CTESubs p on c.session_start_id = p.session_end_id
    WHERE
        p.session_start_id not in (select session_end_id from #user_session) 
        and c.session_end_id not in (select session_end_id from #CTESubs)
end

select
    session_start_id,
    sum(items_bought) items_bought
from #CTESubs
group by 
    session_start_id;

相关问题