apacheflink和事件排序

tpgth1q7  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(246)

我是apache flink的新手,我正在尝试组织来自物联网设备的数据,这些设备监视wifi扫描设备。典型事件如下所示:
{nodeid,设备mac地址,时间,类型}
我希望能够查看mac地址的上一个事件,如果当前事件时间小于该mac地址的上一个事件的指定间隔(例如60秒),我希望更新该mac的运行事件总数。但是,如果间隔时间已过,我希望将聚合事件写入数据库。聚合事件将类似于:
{节点ID,mac地址,上次事件时间,事件总数,日期,小时}
更复杂的是,我想保持每个节点每小时和每天的运行总数保持每个节点所有mac地址事件的运行总数,一旦相关周期完成,输出到数据库。
我已经阅读了文档,但是我正在努力理解完成这项任务所需要的部分。
提前谢谢

oaxa6hgo

oaxa6hgo1#

这听起来像是为会话窗口生成聚合的问题。
你可以这样做:

stream.keyBy(new KeySelector<Event, Integer>() {
    @Override
    public Integer getKey(Event value) throws Exception {
        return value.nodeId;
    }
}).window(EventTimeSessionWindows.withGap(Time.seconds(60)))
  .apply(new ReduceFunction<LogRow>(), new WindowFunction<LogRow, Object, Integer, TimeWindow>())

相关问题