pyspark查找每月重新参与的用户

new9mtju  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(332)

有一个大的数据框看起来像这样,需要找到每月重新加入的用户数,这意味着如果一个用户上个月没有访问,但这个月回来。
如果只需要比较两个月就容易了。如何做这个月比一个月更有效率。

df = spark.createDataFrame(
    [
        ("2020-05-06", "1"),
        ("2020-05-07", "1"),
        ("2020-05-08", "2"),
        ("2020-05-10", "3"),
        ("2020-05-07", "3"),
        ("2020-05-07", "1"),
        ("2020-05-20", "4"),
        ("2020-05-30", "2"),
        ("2020-05-03", "1"),
        ("2020-06-06", "1"),
        ("2020-06-07", "1"),
        ("2020-06-08", "5"),
        ("2020-06-10", "3"),
        ("2020-06-07", "3"),
        ("2020-06-07", "1"),
        ("2020-06-20", "3"),
        ("2020-06-30", "5"),
        ("2020-07-03", "2"),
        ("2020-07-06", "4"),
        ("2020-07-07", "4"),
        ("2020-07-08", "2"),
        ("2020-07-10", "3"),
        ("2020-07-07", "3"),
        ("2020-07-07", "4"),
        ("2020-07-20", "3"),
        ("2020-07-30", "2"),
        ("2020-08-03", "1"),
        ("2020-08-03", "2"),
        ("2020-08-06", "5"),
        ("2020-08-07", "4"),
        ("2020-08-08", "2"),
        ("2020-08-10", "3"),
        ("2020-08-07", "3"),
        ("2020-08-07", "4"),
        ("2020-08-20", "3"),
        ("2020-08-30", "2"),
        ("2020-08-03", "1"),
    ],
    ["visit_date", "userId"],
)

df = df.withColumn("first_day_month", F.trunc("visit_date", "month")).withColumn(
    "first_day_last_month", F.expr("add_months(first_day_month, -1)")
)

s5 = df.where(F.col("first_day_month") == "2020-05-01")
s6 = df.where(F.col("first_day_month") == "2020-06-01").withColumnRenamed(
    "userId", "userId_right"
)

ss = s5.join(s6, s5.userId == s6.userId_right, how="right")
ss.select("userId_right").where(F.col("userId").isNull()).show()

spark数组操作似乎也值得尝试,但需要逐行进行
我不太熟悉的数组计算,也不确定这样运行是否有效

dd = (
    df.groupby("first_day_month")
    .agg(F.collect_list("userId").alias("users_current_month"))
    .orderBy("first_day_month")
)
dd.show()

+---------------+-------------------+
|first_day_month|users_current_month|
+---------------+-------------------+
|     2020-05-01|       [1, 2, 3, 4]|
|     2020-06-01|          [1, 3, 5]|
|     2020-07-01|          [2, 3, 4]|
|     2020-08-01|    [1, 2, 3, 4, 5]|
+---------------+-------------------+

你知道吗?
预期结果:

first_day_month    reengaged_user_count
2020-06-01         1
2020-07-01         2
2020-08-01         2
yqhsw0fo

yqhsw0fo1#

使用分析功能,我们可以做如下操作:

df = df.withColumn("first_day_month", F.trunc("visit_date", "month")).withColumn(
    "first_day_last_month",
    F.lag("first_day_month").over(Window.partitionBy("userId").orderBy("visit_date")),
)
ss = df.where(F.months_between("first_day_month", "first_day_last_month") > 1)
ss.show()

+----------+------+---------------+--------------------+
|visit_date|userId|first_day_month|first_day_last_month|
+----------+------+---------------+--------------------+
|2020-08-06|     5|     2020-08-01|          2020-06-01|
|2020-08-03|     1|     2020-08-01|          2020-06-01|
|2020-07-06|     4|     2020-07-01|          2020-05-01|
|2020-07-03|     2|     2020-07-01|          2020-05-01|
+----------+------+---------------+--------------------+

ss.groupBy("first_day_month").agg(F.collect_set("UserId")).show()

+---------------+-------------------+
|first_day_month|collect_set(UserId)|
+---------------+-------------------+
|     2020-08-01|             [1, 5]|
|     2020-07-01|             [2, 4]|
+---------------+-------------------+

相关问题