使用rocksdb时,flink valuestate将在过期后从存储中删除?

yruzcnhs  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(614)

我正在使用flink版本1.10.1和rocksdb后端。
我知道rocksdb使用“托管内存”中的内存,我没有为托管内存设置任何特定的值。这是Flink做的。
当我监视我的应用程序时,taskmanagers的可用内存总是在减少(我指的是通过 free -h ). 我怀疑原因可能是rocksdb。
问题1=>如果 ValueState 的值过期,则rocksdb将从其内存中删除并从localstorage目录中删除(我的存储容量也有限)
问题2=> stream.keyBy(ipAddress) ,如果 ipAddress 将由rocksdb持有(我说的是key本身而不是状态),它是否总是放在托管内存中?如果不是,那么flink堆内存会增加吗?
以下是我的应用程序的总体结构:

streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....

以下是我的应用程序中的状态示例:

private transient ValueState<SampleObject> sampleState;
 StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
                "sampleState",
                TypeInformation.of(SampleObject.class)
        );
        sampleValueStateDescriptor.enableTimeToLive(ttlConfig);

rocksdb配置:

state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local

为什么我要用rocksdb

我使用rocksdb是因为我有一个巨大的密钥大小(想想它的ip地址),heapstate后端或其他服务器无法处理。
我的应用程序使用rocksdb,因为我在用户定义的keyedprocessfunction中有一堆状态供将来决定(每个状态都有`statettlconfig)

注意

我的应用程序不需要增量检查点或任何关于savepoint的东西。我不关心保存应用程序的所有快照。

t2a7ltrp

t2a7ltrp1#

使用rocksdb时,flink valuestate将在过期后从存储中删除?
是的,但不是马上(在一些早期版本的flink中,答案是“视情况而定”。)
在状态ttl配置中,您没有指定希望如何完成状态清理。在这种情况下,过期值在读取时被显式删除(例如 ValueState#value )在后台定期垃圾收集。对于rocksdb,背景清理是在压实过程中完成的。换句话说,清理工作并不是立即进行的。这些文档提供了关于如何优化的更多细节——您可以将清理配置为更快地完成,代价是性能降低。
键本身不使用任何状态。key selector函数用于对流进行分区,但是密钥不会与keyby一起存储。只有windows和flatmap操作是keeping state,即每个键的状态,所有这些键的状态都将在rocksdb中(除非您已将计时器配置为在堆中,这是一个选项,但flink 1.10计时器默认存储在堆外的rocksdb中)。
你可以改变主意 flatmapKeyedProcessFunction 并使用计时器显式清除状态键的状态—这将使您能够直接控制清除状态的确切时间,而不是依赖于状态ttl机制来最终清除状态。
但更有可能的是,Windows正在形成相当大的状态。如果您可以切换到进行预聚合(通过 reduce 或者 aggregate )这可能会有很大帮助。

相关问题