我问这个问题是因为我在apache flink中设置变量时遇到了麻烦。我想使用一个流来获取数据,用它来初始化第二个流所需的变量。问题是这些流是并行执行的,这会导致初始化第二个流时丢失一个值。示例代码:
KafkaSource<Object> mainSource1 = KafkaSource.<Object>builder()
.setBootstrapServers(...)
.setTopicPattern(Pattern.compile(...))
.setGroupId(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new ObjectDeserializer())
.build();
DataStream<Market> mainStream1 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");
// fetching data from the stream and setting variables
Map<TopicPartition, Long> endOffset = new HashMap<>();
endOffset.put(new TopicPartition("topicName", 0), offsetFromMainStream1);
KafkaSource<Object> mainSource2 = KafkaSource.<Object>builder()
.setBootstrapServers(...)
.setTopicPattern(Pattern.compile(...))
.setGroupId(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.offsets(endOffset))
.setDeserializer(new ObjectDeserializer())
.build();
DataStream<Market> mainStream2 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");
// further stream operations
我将调用第一个流,从中提取数据并在本地设置它,然后可以在第二个流上的操作中使用它
1条答案
按热度按时间xqk2d5yq1#
你想用一个流的数据来控制另一个流的行为,最好的方法是使用Broadcast状态模式。
这涉及到从
mainStream1
创建一个BroadcastStream
,然后将mainStream2
连接到mainStream1
。现在mainStream2
可以访问来自mainStream1
的数据。这是一个基于你的代码的高级例子。我假设键是String。
这里详细解释了这个概念。这里显示的代码也是修改后的版本-https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#the-broadcast-state-pattern