Apache Spark 如何丢弃存储在检查点目录state文件夹中的以前的批量聚合数据?

jhdbpxl9  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(102)

我有一个spark结构化流作业,它将从Kafka那里获取数据,并使用forEachBatch将数据保存到Neo4j中,如下所示:

StreamingQuery query = eventsDf
        .writeStream()
        .queryName("streaming")
        .outputMode("update")
        .trigger(Trigger.ProcessingTime(80000))
        .foreachBatch(
                (VoidFunction2<Dataset<Row>, Long>) (dfBatch, batchId) -> {
                }
        )
        .option("checkpointLocation", "src/main/resources/checkpoint")
        .start();

eventsDf由聚合和筛选的数据组成。(另外,我没有时间戳列来使用窗口/水印)
对于第一个批次,它将有数据,80秒后,将有第二个批次,其中包括由于max函数的聚合而产生的上一个批次的数据。我需要什么:
1.放弃上一批数据
1.检查点位置的State文件夹中存在的数据大小持续增加State文件夹由增量文件和检查点文件组成即使应用了minDeltasForSnapshot配置,文件也会持续增加
我尝试了不同的Spark配置:

"spark.sql.streaming.minBatchesToRetain", 2
"spark.sql.streaming.stateStore.minDeltasForSnapshot", 2
"cleanSource", "delete"
"spark.sql.streaming.forceDeleteTempCheckpointLocation", true

更新

我的代码中有groupbyaggregation,所以我使用.withColumn()添加了batchId作为新列,然后执行了groupbyaggregation。在此更改之后,我的增量文件没有持续增长(能够解决此问题),但我的snapshot文件持续增长。
如何预防这种情况?

63lcw9qa

63lcw9qa1#

我正在状态文件(即增量和快照文件)中存储虚拟数据,这将阻止我的状态增长**(通过进行这些更改,Spark结构化流作业将成为无状态作业)**
以下代码片段位于forEachBatch函数内部

dfBatch
.groupByKey((MapFunction<Row, String>) (row) -> row.getAs("id"), Encoders.STRING())
.flatMapGroupsWithState(mappingFunction, OutputMode.Update(), Encoders.bean(StateData.class), batchEncoder, ProcessingTimeTimeout())

ExpressionEncoder<Row> batchEncoder = RowEncoder.apply(batchSchema());,其中batchSchema()返回数据类型为SchemaType的变量
Map函数:

private FlatMapGroupsWithStateFunction<String, Row, StateData, Row> mappingFunction = (key, value, state) -> {
        if (state.hasTimedOut()) {
            state.remove();
        } else if (state.exists()) {
            StateData existingState = state.get();
            Map<String, Integer> data = existingState.getData();
            int existingCount = data.getOrDefault(key, 0);
            data.put(key, existingCount + 1);
            existingState.setData(data);
            state.update(existingState);
        } else {
            Map<String, Integer> data = new HashMap<>();
            data.put(key, 1);
            StateData newState = new StateData(data);
            state.update(newState);
        }
        return value;
    };

StateData是我的Map类:

package com.spark;

import java.util.Map;

public class StateData {
    Map<String, Integer> data;

    public StateData(Map<String, Integer> data) {
        this.data = data;
    }

    public Map<String, Integer> getData() {
        return data;
    }

    public void setData(Map<String, Integer> data) {
        this.data = data;
    }
}

相关问题