spark时间序列的scala插值

wecizke3  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(475)

我需要用scala插值一个时间序列
原始数据是
2020-08-01, value1 2020-08-03, value3 我想像这样在中间日期插入数据 2020-08-01, value1 2020-08-02, value2 2020-08-03, value3 其中value2是value1和value3的线性插值
有人能帮我一个样例代码,在scala spark中做到这一点吗?由于性能的原因,我宁愿避免自定义项和使用spark.range,但我对您的最佳解决方案持开放态度。
谢谢您!

cczfrluj

cczfrluj1#

0你可以分组,从数据框中得到最小,最大的日期,然后做一个序列,分解它得到一系列的日期。

from pyspark.sql.functions import *
from pyspark.sql import Window

w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)

df.groupBy().agg(min('date').alias('date_min'), max('date').alias('date_max')) \
  .withColumn('date', sequence(to_date('date_min'), to_date('date_max'))) \
  .withColumn('date', explode('date')) \
  .select('date') \
  .join(df, ['date'], 'left') \
  .show(10, False)

+----------+-----+
|date      |value|
+----------+-----+
|2020-08-01|0    |
|2020-08-02|null |
|2020-08-03|null |
|2020-08-04|null |
|2020-08-05|null |
|2020-08-06|10   |
+----------+-----+

1只为你的案子,最简单的一个。

from pyspark.sql.functions import *
from pyspark.sql import Window

w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)

df.withColumn("value_m1",  last('value', ignorenulls=True).over(w1)) \
  .withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
  .withColumn('value', coalesce(col('value'), expr('value_m1 + value_p1 / 2'))) \
  .show(10, False)

+----------+-----+--------+--------+
|date      |value|value_m1|value_p1|
+----------+-----+--------+--------+
|2020-08-01|0.0  |0       |0       |
|2020-08-02|5.0  |0       |10      |
|2020-08-03|10.0 |10      |10      |
+----------+-----+--------+--------+

2一点改进与任意 null 天。例如,当Dataframe由,

+----------+-----+
|date      |value|
+----------+-----+
|2020-08-01|0    |
|2020-08-02|null |
|2020-08-03|null |
|2020-08-04|null |
|2020-08-05|null |
|2020-08-06|10   |
|2020-08-07|null |
|2020-08-08|null |
+----------+-----+

则代码应更改如下:

from pyspark.sql.functions import *
from pyspark.sql import Window

w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w3 = Window.partitionBy('days_m1').orderBy('date')
w4 = Window.partitionBy('days_p1').orderBy(desc('date'))

df.withColumn("value_m1",  last('value', ignorenulls=True).over(w1)) \
  .withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
  .withColumn('days_m1', count(when(col('value').isNotNull(), 1)).over(w1)) \
  .withColumn('days_p1', count(when(col('value').isNotNull(), 1)).over(w2)) \
  .withColumn('days_m1', count(lit(1)).over(w3) - 1) \
  .withColumn('days_p1', count(lit(1)).over(w4) - 1) \
  .withColumn('value', coalesce(col('value'), expr('(days_p1 * value_m1 + days_m1 * value_p1) / (days_m1 + days_p1)'))) \
  .orderBy('date') \
  .show(10, False)

+----------+-----+--------+--------+-------+-------+
|date      |value|value_m1|value_p1|days_m1|days_p1|
+----------+-----+--------+--------+-------+-------+
|2020-08-01|0.0  |0       |0       |0      |0      |
|2020-08-02|2.0  |0       |10      |1      |4      |
|2020-08-03|4.0  |0       |10      |2      |3      |
|2020-08-04|6.0  |0       |10      |3      |2      |
|2020-08-05|8.0  |0       |10      |4      |1      |
|2020-08-06|10.0 |10      |10      |0      |0      |
|2020-08-07|null |10      |null    |1      |1      |
|2020-08-08|null |10      |null    |2      |0      |
+----------+-----+--------+--------+-------+-------+

相关问题