java—在调用execute()之后,是否可以在flink cep中添加新模式?

brtdzjyr  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(358)

我的代码如下:

StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyObject> input = env.addSource(new MyCustomSource());

Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");

PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);

... 定义我的模式

DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());

resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));

try
  {
    env.execute();
  }
  catch (Exception exception)
  {
    log.debug("Error while ", exception);
  }

这段代码可以工作并做我想要的事情,我得到了一个遵循我设置的模式的结果流。
我想知道的是,是否有可能将新模式应用于我稍后添加到环境中的源,从而获得与不同模式匹配的不同结果流,而无需再次调用env.execute(),因为除了新结果流之外,我还获得了冗余的旧结果流(即旧模式)多次执行)?

gopyfrb3

gopyfrb31#

目前,flink的cep库不支持开箱即用的动态模式更改。因此,一旦定义了模式并开始工作,它将只处理这个定义的模式。
但是,您可以编写自己的操作符来实现 TwoInputStreamOperator 接口,在一个输入模式定义上接收流记录,在另一个输入模式定义上接收流记录(类似于coflatmap函数)。对于每个新模式,您都必须编译一个新的 NFA 并将任何新的传入流元素 NFA 也。这样,你就可以达到你的预期行为。
将来,我们很可能会将此功能添加到flink的cep库中。

相关问题