时间窗口的最后一个元组

eqzww0vc  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(472)

我有以下情况

stream<Tuple2<String, Integer>
   .keyBy(0)
   .timeWindow(Time.of(10, TimeUnit.SECONDS))
   .sum(1)
   .flatMap(..)
   .sink()

我要做的是为我的时间窗口计算一个top n。每个窗口的顶部n由Flume存储。
我可以计算平面图中的前n个,但我不知道什么时候把它送到Flume里储存。据我所知,无法从flatmap函数中知道窗口何时结束。
我知道有一些替代方法,比如apply函数同时执行这两个操作,或者在流中创建标记来指示结束,但是我想知道是否有一个更优雅的解决方案。

zfycwa2u

zfycwa2u1#

如果你想计算 N 对于所有关键点上的每个窗口,您应该应用一个长度相同的时间窗口,在其应用方法中计算顶部 N . 你可以这样做:

final int n = 10;
stream
    .keyBy(0)
    .timeWindow(Time.of(10L, TimeUnit.SECONDS))
    .sum(1)
    .timeWindowAll(Time.of(10L, TimeUnit.SECONDS))
    .apply(new AllWindowFunction<Tuple2<String,Integer>, Tuple2<String, Integer>, TimeWindow>() {
        @Override
        public void apply(TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            PriorityQueue<Tuple2<String, Integer>> priorityQueue = new PriorityQueue<>(n, new Comparator<Tuple2<String, Integer>>() {
                @Override
                public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                    return o1.f1 - o2.f1;
                }
            });

            for (Tuple2<String, Integer> value : values) {
                priorityQueue.offer(value);

                while (priorityQueue.size() > n) {
                    priorityQueue.poll();
                }
            }

            for (Tuple2<String, Integer> stringIntegerTuple2 : priorityQueue) {
                out.collect(stringIntegerTuple2);
            }
        }
    })
    .print();

相关问题