在apache flink中设置变量

uajslkp6  于 2022-12-31  发布在  Apache
关注(0)|答案(1)|浏览(170)

我问这个问题是因为我在apache flink中设置变量时遇到了麻烦。我想使用一个流来获取数据,用它来初始化第二个流所需的变量。问题是这些流是并行执行的,这会导致初始化第二个流时丢失一个值。示例代码:

KafkaSource<Object> mainSource1 = KafkaSource.<Object>builder()
      .setBootstrapServers(...)
      .setTopicPattern(Pattern.compile(...))
      .setGroupId(...)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setDeserializer(new ObjectDeserializer())
      .build();

DataStream<Market> mainStream1 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");

// fetching data from the stream and setting variables

Map<TopicPartition, Long> endOffset = new HashMap<>();
endOffset.put(new TopicPartition("topicName", 0), offsetFromMainStream1);


KafkaSource<Object> mainSource2 = KafkaSource.<Object>builder()
      .setBootstrapServers(...)
      .setTopicPattern(Pattern.compile(...))
      .setGroupId(...)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setBounded(OffsetsInitializer.offsets(endOffset))
      .setDeserializer(new ObjectDeserializer())
      .build();

DataStream<Market> mainStream2 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");

// further stream operations

我将调用第一个流,从中提取数据并在本地设置它,然后可以在第二个流上的操作中使用它

xqk2d5yq

xqk2d5yq1#

你想用一个流的数据来控制另一个流的行为,最好的方法是使用Broadcast状态模式。
这涉及到从mainStream1创建一个BroadcastStream,然后将mainStream2连接到mainStream1。现在mainStream2可以访问来自mainStream1的数据。
这是一个基于你的代码的高级例子。我假设键是String。

// Broadcast Stream
MapStateDescriptor<String, Market> stateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Market>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Market> mainStream1BroadcastStream = mainStream1.keyBy(// key by Id).
                        .broadcast(stateDescriptor);

DataStream<Market> yourOutput = mainStream2
                 .connect(mainStream1BroadcastStream)
                 .process(            
                    new KeyedBroadcastProcessFunction<>() {
                         // You can access mainStream1 output and mainStream2 data here.
                     }
                 );

这里详细解释了这个概念。这里显示的代码也是修改后的版本-https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#the-broadcast-state-pattern

相关问题