pyspark-获取1和0序列的第一个值

r7xajy2e  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(482)

我要第一个 indicator 并为每组ID创建一个新的指示符。将有0的长序列,但1序列的第一个1需要有一个称为 first_indicator .

dataframe=spark.createDataFrame([("B2", "2019-11-19 12:07:38", 1), ("B2", "2019-11-19 12:24:25", 1), 
                          ("B2", "2019-11-19 12:37:58", 0), ("B2", "2019-11-19 12:55:08", 1),
                          ("B2", "2019-11-19 13:07:28", 1), ("B2", "2019-11-19 13:20:28", 0),
                          ("F9", "2020-02-02 13:06:36", 0), ("F9", "2020-02-02 13:21:37", 1), 
                          ("F9", "2020-02-02 13:36:38", 1), ("F9", "2020-02-02 13:45:32", 0),
                          ("F9", "2020-02-02 14:06:32", 1), ("F9", "2020-02-02 14:24:31", 1)], 
                          ["id", "date_time", "indicator"]).show()

+---+-------------------+---------+
| id|          date_time|indicator|
+---+-------------------+---------+
| B2|2019-11-19 12:07:38|        1|
| B2|2019-11-19 12:24:25|        1|
| B2|2019-11-19 12:37:58|        0|
| B2|2019-11-19 12:55:08|        1|
| B2|2019-11-19 13:07:28|        1|
| B2|2019-11-19 13:20:28|        0|
| F9|2020-02-02 13:06:36|        0|
| F9|2020-02-02 13:21:37|        1|
| F9|2020-02-02 13:36:38|        1|
| F9|2020-02-02 13:45:32|        0|
| F9|2020-02-02 14:06:32|        1|
| F9|2020-02-02 14:24:31|        1|
+---+-------------------+---------+

所需Dataframe:

+---+-------------------+---------+---------------+
| id|          date_time|indicator|first_indicator|
+---+-------------------+---------+---------------+
| B2|2019-11-19 12:07:38|        1|              1|
| B2|2019-11-19 12:24:25|        1|              0|
| B2|2019-11-19 12:37:58|        0|              0|
| B2|2019-11-19 12:55:08|        1|              1|
| B2|2019-11-19 13:07:28|        1|              0|
| B2|2019-11-19 13:20:28|        0|              0|
| F9|2020-02-02 13:06:36|        0|              0|
| F9|2020-02-02 13:21:37|        1|              1|
| F9|2020-02-02 13:36:38|        1|              0|
| F9|2020-02-02 13:45:32|        0|              0|
| F9|2020-02-02 14:06:32|        1|              1|
| F9|2020-02-02 14:24:31|        1|              0|
+---+-------------------+---------+---------------+
ifsvaxew

ifsvaxew1#

我建议您按“id”分组,并将“date\u time”和“indicator”收集在一个列表中,这样您就可以得到如下结果:

+---+---------------------------------------------------------+
| id|                           array                         |
+---+---------------------------------------------------------+
| B2|[(2019-11-19 12:07:38, 1), (2019-11-19 12:24:25, 1) ... ]|
| F9|[(2020-02-02 13:06:36, 0), (2020-02-02 13:21:37, 0) ... ]|
+---+---------------------------------------------------------+

接下来,您可以构建自己的自定义项,返回第一个指标的记录。在这个udf中,您不需要处理Dataframe,因此要应用的算法考虑起来更“自然”。

y0u0uwnf

y0u0uwnf2#

您可以使用窗口按Dataframe进行分区和排序,然后使用lag函数比较前一个值为0和当前值为1。

w = Window.partitionBy('id').orderBy('date_time')

df.withColumn('target', ((lag('indicator', 1, 0).over(w) == 0) & (col('indicator') == 1)).cast('int')).show()

+---+-------------------+---------+------+
| id|          date_time|indicator|target|
+---+-------------------+---------+------+
| B2|2019-11-19 12:07:38|        1|     1|
| B2|2019-11-19 12:24:25|        1|     0|
| B2|2019-11-19 12:37:58|        0|     0|
| B2|2019-11-19 12:55:08|        1|     1|
| B2|2019-11-19 13:07:28|        1|     0|
| B2|2019-11-19 13:20:28|        0|     0|
| F9|2020-02-02 13:06:36|        0|     0|
| F9|2020-02-02 13:21:37|        1|     1|
| F9|2020-02-02 13:36:38|        1|     0|
| F9|2020-02-02 13:45:32|        0|     0|
| F9|2020-02-02 14:06:32|        1|     1|
| F9|2020-02-02 14:24:31|        1|     0|
+---+-------------------+---------+------+

相关问题