我在spark流项目中工作,我必须计算一列Dataframe的累积和。我已经成功地使用这个链接计算了累计和。但spark只能计算批内的和。下一批从零开始。我必须应用上一批和下一批的逻辑。如何存储所有即将到来的数据或记住上一个spark批来计算累积和。
1批产量
+---------------+-------------------+----+----+----+
| product_id| date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46| 50| 50|
|4008607333T.upf|2017-12-13:02:27:03|3-46| 60| 110|
+---------------+-------------------+----+----+----+
output of 2 batch
+---------------+-------------------+----+----+----+
| product_id| date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:03:27:01|3-46| 30| 30|
|4008607333T.upf|2017-12-13:03:27:03|3-46| 20| 50|
+---------------+-------------------+----+----+----+
it should be
output of 2 batch
+---------------+-------------------+----+----+----+
| product_id| date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:03:27:01|3-46| 30| 30|
|4008607333T.upf|2017-12-13:03:27:03|3-46| 20| 50|
+---------------+-------------------+----+----+----+
Spark代码
val w = Window.partitionBy($"product_id", $"ack")
.orderBy($"date_time")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
.withColumn("val2_sum", sum($"val2").over(w))
暂无答案!
目前还没有任何答案,快来回答吧!