Flink:如何存储状态并在另一个流中使用?

eulz3vhy  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(339)

我有一个flink的用例,我需要从一个文件中读取信息,存储每一行,然后使用这个状态过滤另一个流。
我现在正在和 connect 操作员和a RichCoFlatMapFunction ,但感觉太复杂了。还有,我担心 flatMap2 可以在从文件加载所有状态之前开始执行:

fileStream
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
        private transient ValueState<String> storedPartId;
        @Override
        public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
            // store state
            storedPartId.update(partId);
        }

        @Override
        public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
            if (record.getPartId().equals(storedPartId.value())) {
                out.collect(record);
            } else {
                // do nothing
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<String> descriptor =
                    new ValueStateDescriptor<>(
                            "partId", // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            null);
            storedPartId = getRuntimeContext().getState(descriptor);
        }
    });

有没有更好的方法(从flink1.1.3开始)来完成这种加载状态模式,然后在随后的流中使用它?

ffx8fchx

ffx8fchx1#

你对 CoFlatMapFunction 是正确的。顺序 flatMap1 以及 flatMap2 无法控制调用的,并且依赖于数据到达的顺序。所以, flatMap2 可能在读取所有数据之前调用 flatMap1 .
在Flink1.1.3中,在开始处理流之前读取所有数据的唯一方法是使用 open() a方法 RichFlatMapFunction ,即您必须手动读取和解析文件。
这基本上是一种广播连接策略,也就是说,操作符的每个并行示例都会这样做。缺点是文件的数据将被复制。这样做的好处是,您不必洗牌“主”流(无需使用 keyBy() ).

相关问题