如何对pysparkDataframe中的密钥组执行扫描操作

czq61nw1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(345)

Maven们,我正试图在pysparkDataframe中执行一种扫描操作,根据关键组中的下一条记录在记录上标记结束日期。这就是我的Dataframe的样子-

+---+----+----+-------------------+-------------------+
|Key|col1|col2|     effective_date|           end_date|
+---+----+----+-------------------+-------------------+
|  X| ABC| DEF|2020-08-01 00:00:00|2999-12-31 00:00:00|
|  X|ABC1|DEF1|2020-08-03 00:00:00|2999-12-31 00:00:00|
|  X|ABC2|DEF2|2020-08-05 00:00:00|2999-12-31 00:00:00|
|  Y| PQR| STU|2020-08-07 00:00:00|2999-12-31 00:00:00|
|  Y|PQR1|STU1|2020-08-09 00:00:00|2999-12-31 00:00:00|
+---+----+----+-------------------+-------------------+

期望输出-

+---+----+----+-------------------+-------------------+
|Key|col1|col2|     effective_date|           end_date|
+---+----+----+-------------------+-------------------+
|  X| ABC| DEF|2020-08-01 00:00:00|2020-08-02 23:59:59|
|  X|ABC1|DEF1|2020-08-03 00:00:00|2020-08-04 23:59:59|
|  X|ABC2|DEF2|2020-08-05 00:00:00|2999-12-31 00:00:00|
|  Y| PQR| STU|2020-08-07 00:00:00|2020-08-08 23:59:59|
|  Y|PQR1|STU1|2020-08-09 00:00:00|2999-12-31 00:00:00|
+---+----+----+-------------------+-------------------+

这里记录分组的字段是“key”,我只想在key group中保留一个结束日期为“2999-12-31 00:00:00”的记录。我想标记的所有其他记录都已过期,结束日期是根据下一个记录的生效日期来决定的-1当我们将记录按其生效日期排序时。我在下面试过了-

>>> from pyspark.sql import functions as F
>>> from pyspark.sql import Window
>>> w = Window.partitionBy("Key").orderBy("effective_date")
>>> df1=df.withColumn("end_date",F.date_sub(F.lead("effective_date").over(w), 1))

输出结果与此不符。我使用的是Python2.7和spark 2.2

n6lpvg4x

n6lpvg4x1#

用这个试试 lead 这样地:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("Key").orderBy("effective_date")

df.withColumn("lead", F.lead("effective_date").over(w))\
  .withColumn("end_date", F.when(F.col("lead").isNotNull(), F.expr("""lead - interval 1 second"""))\
                           .otherwise(F.col("end_date"))).drop("lead")\
  .orderBy("effective_date").show()

# +---+----+----+-------------------+-------------------+

# |Key|col1|col2|     effective_date|           end_date|

# +---+----+----+-------------------+-------------------+

# |  X| ABC| DEF|2020-08-01 00:00:00|2020-08-02 23:59:59|

# |  X|ABC1|DEF1|2020-08-03 00:00:00|2020-08-04 23:59:59|

# |  X|ABC2|DEF2|2020-08-05 00:00:00|2999-12-31 00:00:00|

# |  Y| PQR| STU|2020-08-07 00:00:00|2020-08-08 23:59:59|

# |  Y|PQR1|STU1|2020-08-09 00:00:00|2999-12-31 00:00:00|

# +---+----+----+-------------------+-------------------+

相关问题