我的代码如下:
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyObject> input = env.addSource(new MyCustomSource());
Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");
PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);
... 定义我的模式
DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());
resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));
try
{
env.execute();
}
catch (Exception exception)
{
log.debug("Error while ", exception);
}
这段代码可以工作并做我想要的事情,我得到了一个遵循我设置的模式的结果流。
我想知道的是,是否有可能将新模式应用于我稍后添加到环境中的源,从而获得与不同模式匹配的不同结果流,而无需再次调用env.execute(),因为除了新结果流之外,我还获得了冗余的旧结果流(即旧模式)多次执行)?
1条答案
按热度按时间gopyfrb31#
目前,flink的cep库不支持开箱即用的动态模式更改。因此,一旦定义了模式并开始工作,它将只处理这个定义的模式。
但是,您可以编写自己的操作符来实现
TwoInputStreamOperator
接口,在一个输入模式定义上接收流记录,在另一个输入模式定义上接收流记录(类似于coflatmap函数)。对于每个新模式,您都必须编译一个新的NFA
并将任何新的传入流元素NFA
也。这样,你就可以达到你的预期行为。将来,我们很可能会将此功能添加到flink的cep库中。