我有一个示例数据集,我想根据开始日期和结束日期(从2016-01-01到2016-01-08)用0填充日期。
id,date,quantity
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-06,30
1,2016-01-07,20
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-06,20
2,2016-01-07,20
基于下面链接的解决方案,我能够实现部分解决方案。在spark dataframe列中填充缺少的日期
有没有人可以建议如何填写从开始日期到结束日期的日期,甚至是从开始日期到结束日期。
id,date,quantity
1,2016-01-01,0
1,2016-01-02,0
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-05,0
1,2016-01-06,30
1,2016-01-07,20
1,2016-01-08,0
2,2016-01-01,0
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-05,0
2,2016-01-06,20
2,2016-01-07,20
2,2016-01-08,0
2条答案
按热度按时间eanckbw91#
从
Spark-2
.4使用sequence
函数生成所有日期2016-01-01 to 2016-01--08
.然后加入原始Dataframe使用
coalesce
得到quantity and id
价值观。Example:
```df1=sql("select explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").
withColumn("quantity",lit(0)).
withColumn("id",lit(1))
df1.show()
+----------+--------+---+
| date|quantity| id|
+----------+--------+---+
|2016-01-01| 0| 1|
|2016-01-02| 0| 1|
|2016-01-03| 0| 1|
|2016-01-04| 0| 1|
|2016-01-05| 0| 1|
|2016-01-06| 0| 1|
|2016-01-07| 0| 1|
|2016-01-08| 0| 1|
+----------+--------+---+
df.show()
+---+----------+--------+
| id| date|quantity|
+---+----------+--------+
| 1|2016-01-03| 10|
| 1|2016-01-04| 20|
| 1|2016-01-06| 30|
| 1|2016-01-07| 20|
+---+----------+--------+
from pyspark.sql.functions import *
from pyspark.sql.types import *
exprs=['date']+[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns if f not in ['date']]
df1.
alias("df1").
join(df.alias("df"),['date'],'left').
select(*exprs).
orderBy("date").
show()
+----------+--------+---+
| date|quantity| id|
+----------+--------+---+
|2016-01-01| 0| 1|
|2016-01-02| 0| 1|
|2016-01-03| 10| 1|
|2016-01-04| 20| 1|
|2016-01-05| 0| 1|
|2016-01-06| 30| 1|
|2016-01-07| 20| 1|
|2016-01-08| 0| 1|
+----------+--------+---+
`Update:`
df=spark.createDataFrame([(1,'2016-01-03',10),(1,'2016-01-04',20),(1,'2016-01-06',30),(1,'2016-01-07',20),(2,'2016-01-02',10),(2,'2016-01-03',10),(2,'2016-01-04',20),(2,'2016-01-06',20),(2,'2016-01-07',20)],["id","date","quantity"])
df1=df.selectExpr("id").distinct().selectExpr("id","explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").withColumn("quantity",lit(0))
from pyspark.sql.functions import *
from pyspark.sql.types import *
exprs=[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns]
df2=df1.alias("df1").join(df.alias("df"),(col("df1.date") == col("df.date"))& (col("df1.id") == col("df.id")),'left').select(*exprs)
df2.orderBy("id","date").show()
+---+----------+--------+
| id| date|quantity|
+---+----------+--------+
| 1|2016-01-01| 0|
| 1|2016-01-02| 0|
| 1|2016-01-03| 10|
| 1|2016-01-04| 20|
| 1|2016-01-05| 0|
| 1|2016-01-06| 30|
| 1|2016-01-07| 20|
| 1|2016-01-08| 0|
| 2|2016-01-01| 0|
| 2|2016-01-02| 10|
| 2|2016-01-03| 10|
| 2|2016-01-04| 20|
| 2|2016-01-05| 0|
| 2|2016-01-06| 20|
| 2|2016-01-07| 20|
| 2|2016-01-08| 0|
+---+----------+--------+
zi8p0yeb2#
如果要将空值具体填充为0,则
fillna
也不错。