我需要计算a在一天中发生的次数,以及b在15分钟内发生的次数。流可能是a1,a2,b1,b2,a3,b3,b4,b5,a4,a5,a6,a7,b6。在我的例子中,事件结果是a2,b1 a3,b3 a7,b6.当匹配程序发生时,我需要接收实时结果。我厌倦了一些事情。我认为只有使用flink-cep才能实现。但是flink-sql-cep不支持聚合。它只计算发生的事件,在这种情况下,如何用一个sql来完成这个任务。
我花了两个步骤来做这件事。我先用flink-sql-cep来匹配,然后沉入Kafka。在步骤中,我找到了前Kafka和使用窗口聚合。
第一步:选择pins作为pin,'first-step'作为result\u id,cast(order\u amount作为varchar)作为result\u value,event\u time作为result\u time from stra\u dtpipeline match\u recognize(按pin分区
按事件和时间度量排序
t1.pin作为pins,'1'作为order\u amount,localtimestamp作为event\u time每个匹配后的一行在间隔'30'秒内跳到下一行模式(t1 t2)
定义
t1作为t1。act_type='100001',t2作为t2。act_type='100002')第二步:选择pin,'job5'作为result_id,cast(sum(1)over(partition by pin,cast(date_format(event_time,'%y%m%d')as varchar)order by event_time rows between interval'1'day previous and current row)作为varchar作为result_value,当前时间戳作为stra\u dtpipeline\u mid的结果时间,其中result\u id='first-step'和dayofmonth(当前日期)=dayofmonth(事件时间)
我希望用一个sql来完成这个任务。
1条答案
按热度按时间mzaanser1#
您可以使用子查询或视图将两个查询组合成一个查询。
那大概是
或者