re-scalable键控流状态函数

vof42yt1  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(360)

我有下面的flink工作,在那里我尝试使用keyed stream stateful函数(mapstate)和后端类型rockdb,

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")

MyricMapFunction是一个有状态函数,它扩展了richmapfunction,richmapfunction有以下代码,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}

在将来,我想重新缩放并行度(从2到4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度之后,我可以将相应的缓存键控数据获取到其相应的任务槽。我试图探索这个,在这里我发现了一个文档。据此,可以通过listscheckpointed接口实现可重扩展的操作员状态,listscheckpointed接口提供了snapshotstate/restorestate方法。但不确定如何实现可伸缩的键控状态(myrichmapfunction)?我是否需要为myrichmapfunction类实现listcheckpointed接口?如果是,我如何根据restorestate方法上的新并行密钥哈希重新分配缓存(我的mapstate将在启用ttl的情况下保存大量密钥,假设它在任何时间点都最多保存10亿个密钥)?有人能帮我一下吗?或者你能给我举个很好的例子吗。

wtlkbnrh

wtlkbnrh1#

你写的代码已经可以重新缩放了;flink的托管键控状态是可设计的。通过重新平衡对示例的键分配,可以重新调整键控状态的比例(您可以将keyed state视为切分键/值存储。从技术上讲,使用一致性哈希将密钥Map到密钥组,每个并行示例负责一些密钥组。重缩放只涉及在示例之间重新分配关键组。)
这个 ListCheckpointed 接口用于非键控上下文中使用的状态,因此它不适合您所做的操作。还要注意的是 ListCheckpointed 将在Flink1.11中被弃用,取而代之的是更一般的 CheckpointedFunction .
还有一件事:如果 MyKeyExtractor 正在键入 value.getEventId() ,那么你可以使用 ValueState<Boolean> 为了你的缓存,而不是 MapState<String, Boolean> . 这是因为对于keyed state,每个键都有一个单独的valuestate值。仅当需要为流中的每个键存储多个属性/值对时,才需要使用mapstate。
在flink文档的“实践培训”部分中讨论了其中的大部分内容,其中包括一个非常接近您所做工作的示例。

相关问题