我正在运行一个Flink有状态作业,其中检查点大小随着时间的推移不断增长,直到检查点成为问题。状态类型为-ListStateDescriptor
。下面是代码片段:
public class MessageMapper extends
KeyedProcessFunction<String, Message, Message> {
private transient ListState<Trip> trips;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Trip> tripDescriptor =
new ListStateDescriptor<>("trips", Trip.class);
trips = getRuntimeContext().getListState(tripDescriptor);
...
}
@Override
public void processElement(Message message, Context ctx,
Collector<Message> out) throws Exception {
List<Trip> tripList = new ArrayList<>();
trips.get().forEach(tripList::add);
// Depending on the message content, one of the following actions are taken on the state:
// - Add new entry to tripList
// - Remove existing entry from tripList
// Update state
trips.update(tripList);
}
字符串
}
在处理新消息时,trips
状态平均应包含3-5个条目,并不断添加和删除条目。我希望国家规模保持稳定。令我困惑的是,为什么检查点大小在几周/几个月内持续增长。数据分区所依据的唯一键的数目不变。
1.当执行List.update()
时,旧列表是从存储中清除还是保留(永久保留,因此会增加检查点大小)?
1.我是否应该设置TTL配置,即使我不关心数据是否保留数月。
- 谢谢-谢谢
1条答案
按热度按时间nkcskrwz1#
对于流中的每个唯一键,您将获得一个单独的状态。所以如果你有越来越多的唯一键,状态可以无限增长。
如果是这种情况,那么设置TTL将是解决问题的一种可行方法。
顺便说一句,以这种方式使用
ListState
有点滥用:)MapState
更好,但更复杂,有一个单独的ValueState
跟踪递增的键值。如果状态后端是RocksDB,则每个Map条目将成为其表中的一个单独行,因此添加和删除项更有效。