pyspark 我如何给用户的事件给予索引并保持顺序?

6za6bjd0  于 2023-01-25  发布在  Spark
关注(0)|答案(1)|浏览(149)

我有以下 Dataframe :

+------------+------------------+--------------------+
|id.         |install_time_first|           timestamp|
+------------+------------------+--------------------+
|           2|        2022-02-02|2022-02-01 10:03:...|
|           3|        2022-02-01|2022-02-01 10:00:...|
|           2|        2022-02-02|                null|
|           3|        2022-02-01|2022-02-03 11:35:...|
|           1|        2022-02-01|                null|
|           2|        2022-02-02|2022-02-02 10:05:...|
|           3|        2022-02-01|2022-02-01 10:05:...|
|           4|        2022-02-02|                null|
|           1|        2022-02-01|2022-02-01 10:05:...|
|           2|        2022-02-02|2022-02-02 10:05:...|
|           4|        2022-02-02|2022-02-03 11:35:...|
|           1|        2022-02-01|                null|
|           1|        2022-02-01|2022-02-01 10:03:...|
|           1|        2022-02-01|2022-02-01 10:05:...|
|           4|        2022-02-02|2022-02-03 11:35:...|
|           2|        2022-02-02|2022-02-02 11:00:...|
|           4|        2022-02-02|2022-02-03 11:35:...|
|           3|        2022-02-01|2022-02-04 11:35:...|
|           1|        2022-02-01|2022-02-01 10:00:...|
+------------+------------------+--------------------+

我想按install_time_first对 Dataframe 进行排序,并为每个用户(所有事件)添加索引,并保持顺序。

+------------+------------------+--------------------+-----+
|id.         |install_time_first|           timestamp|index|
+------------+------------------+--------------------+-----+
|           1|        2022-02-01|                null|    1|
|           1|        2022-02-01|                null|    1|
|           1|        2022-02-01|2022-02-01 10:00:...|    1|
|           1|        2022-02-01|2022-02-01 10:03:...|    1|
|           1|        2022-02-01|2022-02-01 10:05:...|    1|
|           1|        2022-02-01|2022-02-01 10:05:...|    1|
|           3|        2022-02-01|2022-02-01 10:00:...|    2|
|           3|        2022-02-01|2022-02-01 10:05:...|    2|
|           3|        2022-02-01|2022-02-03 11:35:...|    2|
|           3|        2022-02-01|2022-02-04 11:35:...|    2|
|           2|        2022-02-02|                null|    3|
|           2|        2022-02-02|2022-02-01 10:03:...|    3|
|           2|        2022-02-02|2022-02-02 10:05:...|    3|
|           2|        2022-02-02|2022-02-02 10:05:...|    3|
|           2|        2022-02-02|2022-02-02 11:00:...|    3|
|           4|        2022-02-02|                null|    4|
|           4|        2022-02-02|2022-02-03 11:35:...|    4|
|           4|        2022-02-02|2022-02-03 11:35:...|    4|
|           4|        2022-02-02|2022-02-03 11:35:...|    4|
+------------+------------------+--------------------+-----+

我怎么能做到呢?我不能做到这一点,并保持它的排序

eeq64g8w

eeq64g8w1#

这里的关键观察是"index"列具有相同的"id"列值,但按"install_time_first"排序,查看此问题的一种方法是在(install_time_first,id)上分区/orderBy,并为每对分配一个unque索引,我做了两个解决方案,第一个使用连接,第二个使用windows,并使用了一些技巧,我更喜欢第一个解决方案,因为第二个可能性能繁重:

    • PS**:tou可以在两个解决方案中删除这行代码". orderBy(" install_time_first "," id ")",我添加它只是为了确保输出经过排序,以便可以阅读:
    • 准备数据:**
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([
    (2, "2022-02-02", "2022-02-01 10:03"),
    (3, "2022-02-01", "2022-02-01 10:00"),
    (2, "2022-02-02", None),
    (3, "2022-02-01", "2022-02-03 11:35"),
    (1, "2022-02-01", None),
    (2, "2022-02-02", "2022-02-02 10:05"),
    (3, "2022-02-01", "2022-02-01 10:05"),
    (4, "2022-02-02", None),
    (1, "2022-02-01", "2022-02-01 10:05"),
    (2, "2022-02-02", "2022-02-02 10:05"),
    (4, "2022-02-02", "2022-02-03 11:35"),
    (1, "2022-02-01", None),
    (1, "2022-02-01", "2022-02-01 10:03"),
    (1, "2022-02-01", "2022-02-01 10:05"),
    (4, "2022-02-02", "2022-02-03 11:35"),
    (2, "2022-02-02", "2022-02-02 11:00"),
    (4, "2022-02-02", "2022-02-03 11:35"),
    (3, "2022-02-01", "2022-02-04 11:35"),
    (1, "2022-02-01", "2022-02-01 10:00"),
], ("id", "install_time_first", "timestamp"))
    • 解决方案1:**
df_with_index = df.select("id", "install_time_first").distinct().orderBy("install_time_first", "id")\
    .withColumn("index",monotonically_increasing_id() + 1)\
    .withColumnRenamed("id", "id2").withColumnRenamed("install_time_first", "install_time_first2")
df.join(df_with_index, (df.id == df_with_index.id2) & (df.install_time_first == df_with_index.install_time_first2),
        "left").orderBy("install_time_first", "id").drop("id2", "install_time_first2").show()
    • 解决方案2:**
w = Window.partitionBy(col("id")).orderBy(col("install_time_first"))
w2 = Window.orderBy(col("install_time_first"))
df = df.withColumn("prev_id", lag("id", 1, None).over(w))
df.withColumn("index", when(df.prev_id.isNull() | (df.prev_id != df.id), 1).otherwise(0))\
    .withColumn("index", sum("index").over(w2.rowsBetween(Window.unboundedPreceding, Window.currentRow)))\
    .orderBy("install_time_first", "id").drop("prev_id").show()

两者给出相同的结果:

+---+------------------+----------------+-----+
| id|install_time_first|       timestamp|index|
+---+------------------+----------------+-----+
|  1|        2022-02-01|2022-02-01 10:05|    1|
|  1|        2022-02-01|2022-02-01 10:00|    1|
|  1|        2022-02-01|            null|    1|
|  1|        2022-02-01|            null|    1|
|  1|        2022-02-01|2022-02-01 10:03|    1|
|  1|        2022-02-01|2022-02-01 10:05|    1|
|  3|        2022-02-01|2022-02-03 11:35|    2|
|  3|        2022-02-01|2022-02-01 10:00|    2|
|  3|        2022-02-01|2022-02-04 11:35|    2|
|  3|        2022-02-01|2022-02-01 10:05|    2|
|  2|        2022-02-02|            null|    3|
|  2|        2022-02-02|2022-02-02 10:05|    3|
|  2|        2022-02-02|2022-02-02 10:05|    3|
|  2|        2022-02-02|2022-02-02 11:00|    3|
|  2|        2022-02-02|2022-02-01 10:03|    3|
|  4|        2022-02-02|            null|    4|
|  4|        2022-02-02|2022-02-03 11:35|    4|
|  4|        2022-02-02|2022-02-03 11:35|    4|
|  4|        2022-02-02|2022-02-03 11:35|    4|
+---+------------------+----------------+-----+

相关问题