我的代码有助于确定flink cep中记录的数量是否超过25。因此,当我使用进程时间时,它匹配所有模式,但当我使用事件时间时,它不匹配最后一条记录。
{"trasanction_id":196,"customer_id":27,"datetime":"1576499008876","amount":6094,"state":"SUCCESS"}
{"trasanction_id":197,"customer_id":27,"datetime":"1576499017565","amount":547,"state":"SUCCESS"}
{"trasanction_id":198,"customer_id":27,"datetime":"1576499029116","amount":6824,"state":"SUCCESS"}
{"trasanction_id":196,"customer_id":27,"datetime":"1576499053211","amount":6094,"state":"SUCCESS"}
{"trasanction_id":197,"customer_id":28,"datetime":"1576499063867","amount":547,"state":"FAILED"}
{"trasanction_id":198,"customer_id":28,"datetime":"1576499073566","amount":6824,"state":"FAILED"}
以上是我的记录。我有兴趣匹配每一个事件的金额大于25在事件时间。理想情况下,它应该检测所有记录(它在处理时间内这样做),因为所有记录的数量都大于25。到现在为止,我正在使用boundedoutofertime提取技术,3秒用于无序。
请帮我理解这个。提前谢谢!:)
1条答案
按热度按时间46qrfjad1#
因为cep与时态模式匹配,所以在处理事件时间戳时,首先按照时间戳的顺序对事件进行排序。此排序涉及缓冲每个事件,直到水印赶上该事件,以便为任何较早的事件提供时间。
因为您的水印被配置为落后于流的前缘(即迄今为止最大的时间戳)3秒,所以流的水印永远不会到达最后一个事件的时间戳。这就是为什么没有处理最后一个事件。flink正在等待是否会有任何早期事件到达,并且在水印指示流通过最后一个事件的时间戳完成之前不会放弃。