在PySpark DataFrame中基于日期差异创建二进制指示器列

jtoj6r0c  于 2023-08-02  发布在  Spark
关注(0)|答案(2)|浏览(108)

我正在使用一个PySpark DataFrame,它包含列'ID','date'和'bool'。'bool'是一个指示符。

#+-------+-------------+----+
 #|   ID  |     date    |bool|
 #+-------+-------------+----+
 #|1092829|  2019-10-29 |  0 |
 #|3091902|  2019-12-14 |  1 |
 #|3091902|  2020-07-13 |  0 |
 #|1092829|  2020-07-15 |  1 |
 #|1092829|  2020-07-17 |  1 |
 #|1092829|  2020-08-18 |  1 |
 #|  ...  |     ...     | ...|
 #+-------+-------------+----+

字符串
为了简化,让我们假设我在变量ID上划分 Dataframe ,更清楚地说,我只考虑ID 1092829。请注意,DataFrame是根据日期按升序排列的。

#+-------+-------------+----+
 #|   ID  |     date    |bool|
 #+-------+-------------+----+
 #|1092829|  2019-10-29 |  0 |
 #|1092829|  2019-12-14 |  1 |
 #|1092829|  2020-07-15 |  1 |
 #|1092829|  2020-07-17 |  1 |
 #|1092829|  2020-07-19 |  0 |
 #|1092829|  2020-08-15 |  1 |
 #|1092829|  2020-09-10 |  0 |
 #|1092829|  2020-09-15 |  0 |
 #|1092829|  2020-09-20 |  1 | 
 #|  ...  |     ...     | ...|
 #+-------+-------------+----+

我想为X行创建一个列'D',如果Y行(位于X行下方)的变量bool等于1,并且X行和Y行之间的日期差小于2周,否则为0。

这将产生以下 Dataframe

#+-------+-------------+----+---+
 #|   ID  |     date    |bool| D |
 #+-------+-------------+----+---+
 #|1092829|  2019-10-29 |  0 | 0 |
 #|1092829|  2019-12-14 |  1 | 0 |
 #|1092829|  2020-07-15 |  1 | 1 |
 #|1092829|  2020-07-17 |  1 | 0 |
 #|1092829|  2020-07-19 |  0 | 0 |
 #|1092829|  2020-08-15 |  1 | 0 |
 #|1092829|  2020-09-10 |  0 | 1 |
 #|1092829|  2020-09-15 |  0 | 0 |
 #|1092829|  2020-09-20 |  1 | 0 |
 #|  ...  |     ...     | ...|...|
 #+-------+-------------+----+---+


我知道如何使用pyspark lag函数来实现固定行数,但它远非最佳,因为在 Dataframe 中可能有10行观察结果,跨度不到2周。
有两个滞后的示例:

import pyspark.sql.functions as F

df = df.withColumn("D", F.when((F.datediff(df.date, F.lag(df.date,-1))<14) & (F.lag(df.bool,-1)==1)\
| (F.datediff(df.date, F.lag(df.date,-2))<14) & (F.lag(df.bool,-2)==1),1).otherwise(0))


我怎样才能以一种计算效率高的方式对任意数量的行进行计算呢?

8fsztsew

8fsztsew1#

我认为这个逻辑是可能的。在2周范围内计数bool = 1,如果小于1,则为目标值。

from pyspark.sql import functions as f
from pyspark.sql import Window

w = Window.partitionBy('ID').orderBy('timestamp').rangeBetween(0, 14 * 86400)
​
df.withColumn('timestamp', f.unix_timestamp('date', 'y-M-d')) \
  .withColumn('bool_in_2weeks', f.count(f.when(f.col('bool') == f.lit(1), True)).over(w)) \
  .withColumn('D', f.expr('bool = 1 and bool_in_2weeks > 1').cast('int')) \
  .show(truncate=False)
+-------+----+----------+----------+--------------+---+
|ID     |bool|date      |timestamp |bool_in_2weeks|D  |
+-------+----+----------+----------+--------------+---+
|1092829|0   |2019-10-29|1572307200|0             |0  |
|1092829|1   |2019-12-14|1576281600|1             |0  |
|1092829|1   |2020-07-15|1594771200|2             |1  |
|1092829|1   |2020-07-17|1594944000|1             |0  |
|1092829|0   |2020-07-19|1595116800|0             |0  |
|1092829|1   |2020-08-15|1597449600|1             |0  |
|1092829|1   |2020-09-10|1599696000|1             |0  |
+-------+----+----------+----------+--------------+---+

字符串

hmtdttj4

hmtdttj42#

我以你为例做了一些测试。这是我的想法

df = spark.createDataFrame(
    [
        {"ID": 1092829, "date": "2019-10-29", "bool": 0},
        {"ID": 1092829, "date": "2019-12-14", "bool": 1},
        {"ID": 1092829, "date": "2020-07-15", "bool": 1},
        {"ID": 1092829, "date": "2020-07-17", "bool": 1},
        {"ID": 1092829, "date": "2020-07-19", "bool": 0},
        {"ID": 1092829, "date": "2020-08-15", "bool": 1},
        {"ID": 1092829, "date": "2020-09-10", "bool": 1},
    ]
)

字符串
对于您的情况,使用Window函数既简单又合适。该窗口是通过根据“ID”列对DataFrame进行分区来构造的。这将保证在应用滞后时,不会有两个不同的“ID”值将它们的日期/布尔值混淆。在窗口中,您需要为datediff操作按升序对日期进行排序。

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("ID").orderBy(F.col("ID").asc(), F.col("date").asc())

transformed_df = df.withColumn(
    "D",
    F.coalesce(
        # Apply lag on "date" by -1 (bring next row's value to current row's level) to do `datediff` for the 2-week condition.
        (F.datediff(F.lag("date", -1, None).over(window), F.col("date")) < 14)
        # Apply lag on "bool" by -1 to compare, for a given row, if the next row has a 1 or not.
        & (
            F.lag("bool", -1, None).over(window) == F.lit(1)
        ),  # The comparisons and the "&" produce a boolean column
        F.lit(
            False
        ),  # For the case when the last `date` value for a given "ID" is NULL because of the lag, we set it to False by default.
    ).cast(
        "integer"
    ),  # Cast the boolean result into an integer for a binary value.
)

transformed_df.show()


结果 Dataframe :

#+-------+----------+----+---+
#|     ID|      date|bool|  D|
#+-------+----------+----+---+
#|1092829|2019-10-29|   0|  0|
#|1092829|2019-12-14|   1|  0|
#|1092829|2020-07-15|   1|  1|
#|1092829|2020-07-17|   1|  0|
#|1092829|2020-07-19|   0|  0|
#|1092829|2020-08-15|   1|  0|
#|1092829|2020-09-10|   1|  0|
#+-------+----------+----+---+


我希望这能回答你的问题。

相关问题