如何使用flinksql识别是否有n个特定事件到达?

bvjxkvbb  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(240)

我对Flink很陌生,正努力达到以下要求。
我的要求:我的应用程序正在收听来自EventHub的实时事件流。我需要做->当事件evt1和evt2都到达时(以任何顺序),然后根据hashmap(string,list)中的键对它们进行分组。现在,evt1和evt2可以从事件中心多次访问。我需要使用flinksql方法来实现这一点。
到目前为止我所做的:我的sql是:

select * from Events
Where name in ('EVT1','EVT2')
and (select count(*) from Events where name = 'EVT1') > 0
and (select count(*) from Events where name = 'EVT2') > 0;

我的java代码片段是-

Table table = env.sqlQuery(sql);
env.toRetractStream(table, Row.class)
 .process(new ProcessFunction <Tuple2<Boolean, Row>, Object>() {
     @Override
     public void processElement(Tuple2<Boolean, Row> value, Context ctx,Collector<Object> out) throws Exception {
         Row ev = value.f1;
         log.info(ev);
         // more code here
     }    
 });

我的测试scenario:-
我启动flink程序
evt1从eventhub接收。什么都没发生。
evt2从eventhub接收。条件匹配,log.info()一个接一个地打印2条记录。
1小时后,事件evt1再次到达。条件仍然匹配并在日志中打印这个新的evt1。
我的question:-
在上面的步骤4中,为什么条件匹配。是因为在新的evt1到达时,flink将它匹配到旧的evt2上,而旧的evt2已经在前面处理过了吗?如果是这样,我该如何防止?在第4步之后,我希望flink等待下一个匹配的evt2。而且,即使它匹配旧的事件,那么为什么只有新的evt1得到打印。
这个要求可以用flink解决吗?怎样?
flink能一次性给我提供所有的结果吗。因为,在上面的示例中,在processelement()方法中,我如何知道我处理了所有记录。因为在实际的生产场景中,我不知道我的sql将匹配多少个事件。
提前谢谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题