根据spark中的日期范围填充数据

9gm1akwq  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(371)

我有一个示例数据集,我想根据开始日期和结束日期(从2016-01-01到2016-01-08)用0填充日期。

id,date,quantity
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-06,30
1,2016-01-07,20
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-06,20
2,2016-01-07,20

基于下面链接的解决方案,我能够实现部分解决方案。在spark dataframe列中填充缺少的日期
有没有人可以建议如何填写从开始日期到结束日期的日期,甚至是从开始日期到结束日期。

id,date,quantity
1,2016-01-01,0
1,2016-01-02,0
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-05,0
1,2016-01-06,30
1,2016-01-07,20
1,2016-01-08,0
2,2016-01-01,0
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-05,0
2,2016-01-06,20
2,2016-01-07,20
2,2016-01-08,0
eanckbw9

eanckbw91#

Spark-2 .4使用 sequence 函数生成所有日期 2016-01-01 to 2016-01--08 .
然后加入原始Dataframe使用 coalesce 得到 quantity and id 价值观。 Example: ```
df1=sql("select explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").
withColumn("quantity",lit(0)).
withColumn("id",lit(1))

df1.show()

+----------+--------+---+

| date|quantity| id|

+----------+--------+---+

|2016-01-01| 0| 1|

|2016-01-02| 0| 1|

|2016-01-03| 0| 1|

|2016-01-04| 0| 1|

|2016-01-05| 0| 1|

|2016-01-06| 0| 1|

|2016-01-07| 0| 1|

|2016-01-08| 0| 1|

+----------+--------+---+

df.show()

+---+----------+--------+

| id| date|quantity|

+---+----------+--------+

| 1|2016-01-03| 10|

| 1|2016-01-04| 20|

| 1|2016-01-06| 30|

| 1|2016-01-07| 20|

+---+----------+--------+

from pyspark.sql.functions import *
from pyspark.sql.types import *

exprs=['date']+[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns if f not in ['date']]

df1.
alias("df1").
join(df.alias("df"),['date'],'left').
select(*exprs).
orderBy("date").
show()

+----------+--------+---+

| date|quantity| id|

+----------+--------+---+

|2016-01-01| 0| 1|

|2016-01-02| 0| 1|

|2016-01-03| 10| 1|

|2016-01-04| 20| 1|

|2016-01-05| 0| 1|

|2016-01-06| 30| 1|

|2016-01-07| 20| 1|

|2016-01-08| 0| 1|

+----------+--------+---+

`Update:`
df=spark.createDataFrame([(1,'2016-01-03',10),(1,'2016-01-04',20),(1,'2016-01-06',30),(1,'2016-01-07',20),(2,'2016-01-02',10),(2,'2016-01-03',10),(2,'2016-01-04',20),(2,'2016-01-06',20),(2,'2016-01-07',20)],["id","date","quantity"])

df1=df.selectExpr("id").distinct().selectExpr("id","explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").withColumn("quantity",lit(0))

from pyspark.sql.functions import *
from pyspark.sql.types import *

exprs=[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns]

df2=df1.alias("df1").join(df.alias("df"),(col("df1.date") == col("df.date"))& (col("df1.id") == col("df.id")),'left').select(*exprs)

df2.orderBy("id","date").show()

+---+----------+--------+

| id| date|quantity|

+---+----------+--------+

| 1|2016-01-01| 0|

| 1|2016-01-02| 0|

| 1|2016-01-03| 10|

| 1|2016-01-04| 20|

| 1|2016-01-05| 0|

| 1|2016-01-06| 30|

| 1|2016-01-07| 20|

| 1|2016-01-08| 0|

| 2|2016-01-01| 0|

| 2|2016-01-02| 10|

| 2|2016-01-03| 10|

| 2|2016-01-04| 20|

| 2|2016-01-05| 0|

| 2|2016-01-06| 20|

| 2|2016-01-07| 20|

| 2|2016-01-08| 0|

+---+----------+--------+

zi8p0yeb

zi8p0yeb2#

如果要将空值具体填充为0,则 fillna 也不错。

import pyspark.sql.functions as f
from pyspark.sql import Window

df2 = df.select('id').distinct() \
  .withColumn('date', f.expr('''explode(sequence(date('2016-01-01'), date('2016-01-08'), INTERVAL 1 days)) as date'''))

df2.join(df, ['id', 'date'], 'left').fillna(0).orderBy('id', 'date').show(20, False)

+---+----------+--------+
|id |date      |quantity|
+---+----------+--------+
|1  |2016-01-01|0       |
|1  |2016-01-02|0       |
|1  |2016-01-03|10      |
|1  |2016-01-04|20      |
|1  |2016-01-05|0       |
|1  |2016-01-06|30      |
|1  |2016-01-07|20      |
|1  |2016-01-08|0       |
|2  |2016-01-01|0       |
|2  |2016-01-02|10      |
|2  |2016-01-03|10      |
|2  |2016-01-04|20      |
|2  |2016-01-05|0       |
|2  |2016-01-06|20      |
|2  |2016-01-07|20      |
|2  |2016-01-08|0       |
+---+----------+--------+

相关问题