我正在使用flink版本1.10.1和rocksdb后端。
我知道rocksdb使用“托管内存”中的内存,我没有为托管内存设置任何特定的值。这是Flink做的。
当我监视我的应用程序时,taskmanagers的可用内存总是在减少(我指的是通过 free -h
). 我怀疑原因可能是rocksdb。
问题1=>如果 ValueState
的值过期,则rocksdb将从其内存中删除并从localstorage目录中删除(我的存储容量也有限)
问题2=> stream.keyBy(ipAddress)
,如果 ipAddress
将由rocksdb持有(我说的是key本身而不是状态),它是否总是放在托管内存中?如果不是,那么flink堆内存会增加吗?
以下是我的应用程序的总体结构:
streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....
以下是我的应用程序中的状态示例:
private transient ValueState<SampleObject> sampleState;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
"sampleState",
TypeInformation.of(SampleObject.class)
);
sampleValueStateDescriptor.enableTimeToLive(ttlConfig);
rocksdb配置:
state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local
为什么我要用rocksdb
我使用rocksdb是因为我有一个巨大的密钥大小(想想它的ip地址),heapstate后端或其他服务器无法处理。
我的应用程序使用rocksdb,因为我在用户定义的keyedprocessfunction中有一堆状态供将来决定(每个状态都有`statettlconfig)
注意
我的应用程序不需要增量检查点或任何关于savepoint的东西。我不关心保存应用程序的所有快照。
1条答案
按热度按时间t2a7ltrp1#
使用rocksdb时,flink valuestate将在过期后从存储中删除?
是的,但不是马上(在一些早期版本的flink中,答案是“视情况而定”。)
在状态ttl配置中,您没有指定希望如何完成状态清理。在这种情况下,过期值在读取时被显式删除(例如
ValueState#value
)在后台定期垃圾收集。对于rocksdb,背景清理是在压实过程中完成的。换句话说,清理工作并不是立即进行的。这些文档提供了关于如何优化的更多细节——您可以将清理配置为更快地完成,代价是性能降低。键本身不使用任何状态。key selector函数用于对流进行分区,但是密钥不会与keyby一起存储。只有windows和flatmap操作是keeping state,即每个键的状态,所有这些键的状态都将在rocksdb中(除非您已将计时器配置为在堆中,这是一个选项,但flink 1.10计时器默认存储在堆外的rocksdb中)。
你可以改变主意
flatmap
到KeyedProcessFunction
并使用计时器显式清除状态键的状态—这将使您能够直接控制清除状态的确切时间,而不是依赖于状态ttl机制来最终清除状态。但更有可能的是,Windows正在形成相当大的状态。如果您可以切换到进行预聚合(通过
reduce
或者aggregate
)这可能会有很大帮助。