PySpark:如何将 Dataframe 中的日期范围转换为新的 Dataframe

vxqlmq5t  于 2022-12-17  发布在  Spark
关注(0)|答案(1)|浏览(184)

我有一个只有一行的PySpark数据框:

spark_session_tbl_df.printSchema()

spark_session_tbl_df.show()   

root
 |-- strm: string (nullable = true)
 |-- acad_career: string (nullable = true)
 |-- session_code: string (nullable = true)
 |-- sess_begin_dt: timestamp (nullable = true)
 |-- sess_end_dt: timestamp (nullable = true)
 |-- census_dt: timestamp (nullable = true)

+----+-----------+------------+-------------------+-------------------+-------------------+
|strm|acad_career|session_code|      sess_begin_dt|        sess_end_dt|          census_dt|
+----+-----------+------------+-------------------+-------------------+-------------------+
|2228|       UGRD|           1|2022-08-20 00:00:00|2022-12-03 00:00:00|2022-09-19 00:00:00|
+----+-----------+------------+-------------------+-------------------+-------------------+

我正尝试输出如下内容,其中每行是一个7天的范围/序列:

+-------------------+-------------------+
|      sess_begin_dt|        sess_end_dt|         
+-------------------+-------------------+
|2022-08-20         |2022-08-27         |
+-------------------+-------------------+
|2022-08-28         |2022-09-04         |
+----+--------------+-------------------+
|2022-09-05         |2022-09-12         |
+-------------------+-------------------+
|2022-09-13         |2022-09-20         |
+----+--------------+-------------------+
|2022-09-21         |2022-09-28         |
+-------------------+-------------------+
               .....
+-------------------+-------------------+
|2022-11-26         |2022-12-03         |
+----+--------------+-------------------+

我在下面尝试了这个方法,但是我不确定它是否可以引用PySpark Dataframe ,或者我需要用另一种方法来实现上面的期望输出。

from pyspark.sql.functions import sequence, to_date, explode, col

    date_range_df = spark.sql("SELECT sequence(to_date('sess_begin_dt'), to_date('sess_end_dt'), interval 7 day) as date").withColumn("date", explode(col("date")))
    date_range_df.show()
omhiaaxx

omhiaaxx1#

处理时间序列的方法之一是将日期转换为时间戳,并以数值方式解决问题,最后再将其转换为日期。

from pyspark.sql import functions as F

data = [['2022-08-20 00:00:00', '2022-12-03 00:00:00']]
df = spark.createDataFrame(data = data, schema = ['start', 'end'])

week_seconds = 7*24*60*60
(
    df
    .withColumn('start_timestamp', F.unix_timestamp('start'))
    .withColumn('end_timestamp', F.unix_timestamp('end'))
    .select(
        F.explode(
            F.sequence('start_timestamp', 'end_timestamp', F.lit(week_seconds)))
        .alias('start_date'))
    .withColumn('start_date', F.to_date(F.from_unixtime('start_date')))
    .withColumn('end_date', F.date_add('start_date', 6))
).show()

+----------+----------+
|start_date|  end_date|
+----------+----------+
|2022-08-20|2022-08-26|
|2022-08-27|2022-09-02|
|2022-09-03|2022-09-09|
|2022-09-10|2022-09-16|
|2022-09-17|2022-09-23|
+----------+----------+

相关问题