我正在使用一个PySpark DataFrame,它包含列'ID','date'和'bool'。'bool'是一个指示符。
#+-------+-------------+----+
#| ID | date |bool|
#+-------+-------------+----+
#|1092829| 2019-10-29 | 0 |
#|3091902| 2019-12-14 | 1 |
#|3091902| 2020-07-13 | 0 |
#|1092829| 2020-07-15 | 1 |
#|1092829| 2020-07-17 | 1 |
#|1092829| 2020-08-18 | 1 |
#| ... | ... | ...|
#+-------+-------------+----+
字符串
为了简化,让我们假设我在变量ID上划分 Dataframe ,更清楚地说,我只考虑ID 1092829。请注意,DataFrame是根据日期按升序排列的。
#+-------+-------------+----+
#| ID | date |bool|
#+-------+-------------+----+
#|1092829| 2019-10-29 | 0 |
#|1092829| 2019-12-14 | 1 |
#|1092829| 2020-07-15 | 1 |
#|1092829| 2020-07-17 | 1 |
#|1092829| 2020-07-19 | 0 |
#|1092829| 2020-08-15 | 1 |
#|1092829| 2020-09-10 | 0 |
#|1092829| 2020-09-15 | 0 |
#|1092829| 2020-09-20 | 1 |
#| ... | ... | ...|
#+-------+-------------+----+
型
我想为X行创建一个列'D',如果Y行(位于X行下方)的变量bool等于1,并且X行和Y行之间的日期差小于2周,否则为0。
这将产生以下 Dataframe
#+-------+-------------+----+---+
#| ID | date |bool| D |
#+-------+-------------+----+---+
#|1092829| 2019-10-29 | 0 | 0 |
#|1092829| 2019-12-14 | 1 | 0 |
#|1092829| 2020-07-15 | 1 | 1 |
#|1092829| 2020-07-17 | 1 | 0 |
#|1092829| 2020-07-19 | 0 | 0 |
#|1092829| 2020-08-15 | 1 | 0 |
#|1092829| 2020-09-10 | 0 | 1 |
#|1092829| 2020-09-15 | 0 | 0 |
#|1092829| 2020-09-20 | 1 | 0 |
#| ... | ... | ...|...|
#+-------+-------------+----+---+
型
我知道如何使用pyspark lag函数来实现固定行数,但它远非最佳,因为在 Dataframe 中可能有10行观察结果,跨度不到2周。
有两个滞后的示例:
import pyspark.sql.functions as F
df = df.withColumn("D", F.when((F.datediff(df.date, F.lag(df.date,-1))<14) & (F.lag(df.bool,-1)==1)\
| (F.datediff(df.date, F.lag(df.date,-2))<14) & (F.lag(df.bool,-2)==1),1).otherwise(0))
型
我怎样才能以一种计算效率高的方式对任意数量的行进行计算呢?
2条答案
按热度按时间8fsztsew1#
我认为这个逻辑是可能的。在2周范围内计数bool = 1,如果小于1,则为目标值。
字符串
hmtdttj42#
我以你为例做了一些测试。这是我的想法
字符串
对于您的情况,使用Window函数既简单又合适。该窗口是通过根据“ID”列对DataFrame进行分区来构造的。这将保证在应用滞后时,不会有两个不同的“ID”值将它们的日期/布尔值混淆。在窗口中,您需要为datediff操作按升序对日期进行排序。
型
结果 Dataframe :
型
我希望这能回答你的问题。