You can find an example of a connected stream with a shared state in the Ververica training page: https://training.ververica.com (Stateful Stream Processing, Slide 13)
public static class ControlFunction extends KeyedCoProcessFunction<String, String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void processElement1(String controlValue, Context context, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void processElement2(String dataValue, Context context, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(dataValue);
}
}
}
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> data = env
.fromElements("Flink", "DROP", "Forward", "IGNORE")
.keyBy(x -> x);
control
.connect(data)
.process(new ControlFunction())
.print();
env.execute();
}
}
In your case, you would need to keep the contents of the 2nd stream in the KeyedCoProcessFunction state and have the 1st stream read from the state to join it with its elements. You'll need to think how to key your streams and what kind of state to use, but that would be the main idea.
1条答案
按热度按时间omvjsjqw1#
You can find an example of a connected stream with a shared state in the Ververica training page: https://training.ververica.com (Stateful Stream Processing, Slide 13)
In your case, you would need to keep the contents of the 2nd stream in the
KeyedCoProcessFunction
state and have the 1st stream read from the state to join it with its elements. You'll need to think how to key your streams and what kind of state to use, but that would be the main idea.