Flink检查点大小持续增长

qhhrdooz  于 2023-08-01  发布在  Apache
关注(0)|答案(1)|浏览(167)

我正在运行一个Flink有状态作业,其中检查点大小随着时间的推移不断增长,直到检查点成为问题。状态类型为-ListStateDescriptor。下面是代码片段:

public class MessageMapper extends
        KeyedProcessFunction<String, Message, Message> {
    private transient ListState<Trip> trips;

    @Override
    public void open(Configuration parameters) {
        ListStateDescriptor<Trip> tripDescriptor =
                new ListStateDescriptor<>("trips", Trip.class);
        trips = getRuntimeContext().getListState(tripDescriptor);
        ...
    }

    @Override
    public void processElement(Message message, Context ctx,
                               Collector<Message> out) throws Exception {

        List<Trip> tripList = new ArrayList<>();
        trips.get().forEach(tripList::add);

        // Depending on the message content, one of the following actions are taken on the state:
        // - Add new entry to tripList
        // - Remove existing entry from tripList

        // Update state
        trips.update(tripList);
    }

字符串
}
在处理新消息时,trips状态平均应包含3-5个条目,并不断添加和删除条目。我希望国家规模保持稳定。令我困惑的是,为什么检查点大小在几周/几个月内持续增长。数据分区所依据的唯一键的数目不变。
1.当执行List.update()时,旧列表是从存储中清除还是保留(永久保留,因此会增加检查点大小)?
1.我是否应该设置TTL配置,即使我不关心数据是否保留数月。

  • 谢谢-谢谢
nkcskrwz

nkcskrwz1#

对于流中的每个唯一键,您将获得一个单独的状态。所以如果你有越来越多的唯一键,状态可以无限增长。
如果是这种情况,那么设置TTL将是解决问题的一种可行方法。
顺便说一句,以这种方式使用ListState有点滥用:)MapState更好,但更复杂,有一个单独的ValueState跟踪递增的键值。如果状态后端是RocksDB,则每个Map条目将成为其表中的一个单独行,因此添加和删除项更有效。

相关问题