如何记住前一批spark流计算累计和

ttcibm8c  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(232)

我在spark流项目中工作,我必须计算一列Dataframe的累积和。我已经成功地使用这个链接计算了累计和。但spark只能计算批内的和。下一批从零开始。我必须应用上一批和下一批的逻辑。如何存储所有即将到来的数据或记住上一个spark批来计算累积和。
1批产量

+---------------+-------------------+----+----+----+
|     product_id|          date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  50|  50|
|4008607333T.upf|2017-12-13:02:27:03|3-46|  60| 110|

+---------------+-------------------+----+----+----+
output of 2 batch
+---------------+-------------------+----+----+----+
|     product_id|          date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:03:27:01|3-46|  30|  30|
|4008607333T.upf|2017-12-13:03:27:03|3-46|  20|  50|
+---------------+-------------------+----+----+----+

it should be
    output of 2 batch
+---------------+-------------------+----+----+----+
|     product_id|          date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:03:27:01|3-46|  30|  30|
|4008607333T.upf|2017-12-13:03:27:03|3-46|  20|  50|
+---------------+-------------------+----+----+----+

Spark代码

val w = Window.partitionBy($"product_id", $"ack")
  .orderBy($"date_time")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)
val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
  .withColumn("val2_sum", sum($"val2").over(w))

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题