ApacheFlink—应用windowfunction会丢弃结果中的事件

zujrkrfu  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(797)

我试图计算流中的元素,同时用窗口的结束时间丰富结果。
使用flink提供的kafka10消费者从kafka接收事件。使用eventtime。一个简单的keyedstream.count(…)很好用。这条小溪长4分钟。通过使用3分钟的时间窗口,只接收一个输出。应该有两个。结果是用bucketingsink写的。

val count = stream.map( m => (m.getContext, 1) )
    .keyBy( 0 )
    .timeWindow( Time.minutes(3) )
    .apply( new EndTimeWindow() )
    .map( new JsonMapper() )
count.addSink( countSink )
class EndTimeWindow extends WindowFunction[(String,Int),(String, Int),Tuple, TimeWindow]{
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit =  {
        var sum: Int = 0
        for( value <-input ) {
            sum = sum + value._2;
        }
        out.collect( (window.getEnd.toString, new Integer(sum ) ))
    }
}

通过使用3分钟的时间窗口,只接收一个事件量较小的输出。应该有两个输出。

q9yhzks0

q9yhzks01#

更准确地说,当合适的水印到达时,事件时间窗口关闭——在有界无序水印生成器中,这将发生(1)如果事件到达的时间足够远,或者(2)如果事件来自到达其末端的有限源,因为在这种情况下,flink将发送一个时间戳为long.max\的水印,该水印将关闭所有打开的事件时间窗口。然而,有了Kafka作为你的来源,那就不会发生了。

c8ib6hqw

c8ib6hqw2#

好吧,我想,我知道出了什么问题。错误发生了,因为我对这个问题的想法是错误的。因为我使用的是eventtime,所以当时间戳大于窗口结束时间的事件到达时,窗口会关闭。当流结束时,不再有元素到达。因此,窗口永远不会关闭。

相关问题