如何在flink中连接两个Kafka流(第一个流正在运行,第二个流是静态的,只有很少的记录,就像主表一样)

fjaof16o  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(220)

我想在第二个流的帮助下丰富我的第一个流,就像流动的记录不断地与第二个流连接,就像一个查找,我想像一个表一样永远保存在内存中。我可以使用的任何代码示例或任何flink API都适合这个用例。

omvjsjqw

omvjsjqw1#

You can find an example of a connected stream with a shared state in the Ververica training page: https://training.ververica.com (Stateful Stream Processing, Slide 13)

public static class ControlFunction extends KeyedCoProcessFunction<String, String, String, String> {
  private ValueState<Boolean> blocked;
    
  @Override
  public void open(Configuration config) {
    blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
  }
​
  @Override
  public void processElement1(String controlValue, Context context, Collector<String> out) throws Exception {
    blocked.update(Boolean.TRUE);
  }
    
  @Override
  public void processElement2(String dataValue, Context context, Collector<String> out) throws Exception {
    if (blocked.value() == null) {
      out.collect(dataValue);
    }
  }
}

public class StreamingJob {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
    DataStream<String> data = env
      .fromElements("Flink", "DROP", "Forward", "IGNORE")
      .keyBy(x -> x);
    
    control
      .connect(data)
      .process(new ControlFunction())
      .print();
    
    env.execute();
  }
}

In your case, you would need to keep the contents of the 2nd stream in the KeyedCoProcessFunction state and have the 1st stream read from the state to join it with its elements. You'll need to think how to key your streams and what kind of state to use, but that would be the main idea.

相关问题