kafka流:如何基于跳跃窗口触发事件,以及如何基于作为跳跃窗口一部分的窗口组合集触发事件

yqkkidmi  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(264)

这是我们的拓扑结构:
测试事件--->groupbykey-->window aggregation-->suppress-->filter-->rekey
这是流程
我们有1分钟的跳伞时间。总窗口大小为5分钟。跳跃窗口每5分钟评估一次,如何获得每1分钟跳跃次数??
总时间为5分钟,跃点持续时间为1分钟。

1. How to TRIGGER some event based on 1 min HOP

2. How to TRIGGER some event based on TOTAL 5 min WINDOW

   ||-----||----||-------||------||-----|

          ||-----||----||-------||------||-----|

我们想做两件事。
我们要计算1分钟跳跃窗口中的事件总数。如果事件总数>某个阈值,则为该1分钟跳跃窗口触发一个组合事件。
然后,如果组合事件的计数大于某个数字,则要触发警报。
从kafka窗口api

Code looks like this:
KStream<String, DataPointUri> KS0 = builder.stream(inputTopicsList,);
KGroupedStream<String, DataPointUri> KS1 = KS0.groupByKey();
Duration windowDuration =getDuration(...)); 
Duration hopDuration =getDuration(env.getProperty(...)));
TimeWindowedKStream<String, DataPointUri> KS2 = KS1.windowedBy(TimeWindows.of(windowDuration).advanceBy(hopDuration).grace(Duration.ofSeconds(10)));
KTable<Windowed<String>, Long> uriCount = KS2.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>with(Serdes.String(), Serdes.Long()));

在这里,我们得到了5分钟窗口计数。不适用于1分钟窗口。如何获得1分钟窗口的详细信息?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题