我有一个spark结构化流作业,它将从Kafka那里获取数据,并使用forEachBatch将数据保存到Neo4j中,如下所示:
StreamingQuery query = eventsDf
.writeStream()
.queryName("streaming")
.outputMode("update")
.trigger(Trigger.ProcessingTime(80000))
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>) (dfBatch, batchId) -> {
}
)
.option("checkpointLocation", "src/main/resources/checkpoint")
.start();
eventsDf由聚合和筛选的数据组成。(另外,我没有时间戳列来使用窗口/水印)
对于第一个批次,它将有数据,80秒后,将有第二个批次,其中包括由于max
函数的聚合而产生的上一个批次的数据。我需要什么:
1.放弃上一批数据
1.检查点位置的State
文件夹中存在的数据大小持续增加State
文件夹由增量文件和检查点文件组成即使应用了minDeltasForSnapshot
配置,文件也会持续增加
我尝试了不同的Spark配置:
"spark.sql.streaming.minBatchesToRetain", 2
"spark.sql.streaming.stateStore.minDeltasForSnapshot", 2
"cleanSource", "delete"
"spark.sql.streaming.forceDeleteTempCheckpointLocation", true
更新:
我的代码中有groupby
和aggregation
,所以我使用.withColumn()
添加了batchId作为新列,然后执行了groupby
和aggregation
。在此更改之后,我的增量文件没有持续增长(能够解决此问题),但我的snapshot
文件持续增长。
如何预防这种情况?
1条答案
按热度按时间63lcw9qa1#
我正在状态文件(即增量和快照文件)中存储虚拟数据,这将阻止我的状态增长**(通过进行这些更改,Spark结构化流作业将成为无状态作业)**
以下代码片段位于
forEachBatch
函数内部ExpressionEncoder<Row> batchEncoder = RowEncoder.apply(batchSchema());
,其中batchSchema()返回数据类型为SchemaType
的变量Map函数:
StateData
是我的Map类: