我有一个df bookingDt
以及 arrivalDt
柱。我要找出这两个日期之间的所有日期。
示例代码:
df = spark.sparkContext.parallelize(
[Row(vyge_id=1000, bookingDt='2018-01-01', arrivalDt='2018-01-05')]).toDF()
diffDaysDF = df.withColumn("diffDays", datediff('arrivalDt', 'bookingDt'))
diffDaysDF.show()
代码输出:
+----------+----------+-------+--------+
| arrivalDt| bookingDt|vyge_id|diffDays|
+----------+----------+-------+--------+
|2018-01-05|2018-01-01| 1000| 4|
+----------+----------+-------+--------+
我尝试的是找出两个日期之间的天数,并使用 timedelta
功能和 explode
是的。
dateList = [str(bookingDt + timedelta(i)) for i in range(diffDays)]
预期产量:
基本上,我需要建立一个df,其中每个日期都有一个记录 bookingDt
以及 arrivalDt
,包含在内。
+----------+----------+-------+----------+
| arrivalDt| bookingDt|vyge_id|txnDt |
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-01|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-02|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-03|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-04|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01| 1000|2018-01-05|
+----------+----------+-------+----------+
4条答案
按热度按时间vulvrdjw1#
只要您使用的是sparkversion2.1或更高版本,您就可以利用这样一个事实:在使用spark2.1时,我们可以使用列值作为参数
pyspark.sql.functions.expr()
:创建一个长度等于的重复逗号伪字符串
diffDays
将此字符串拆分为','
把它变成一个大小的数组diffDays
使用pyspark.sql.functions.posexplode()
将此数组与其索引一起分解最后使用
pyspark.sql.functions.date_add()
将索引值天数添加到bookingDt
代码:sg2wtvxw2#
好吧,你可以跟着做。
创建仅包含日期的Dataframe:
dates_df
#从第一天到第二天bookingDt
最后一个arrivalDt
然后将这些df与between条件连接起来:它的工作速度可能比解决方案更快
explode
但是,您需要确定此df的开始和结束日期。10年后,df将只有3650张唱片,不用担心太多。a5g8bdjr3#
对于spark 2.4,可以使用sequence创建一个数组,其中包含
bookingDt
以及arrivalDt
. 然后可以分解这个数组。输出:
ifsvaxew4#
正如@vvg所建议的:
输出: