在没有以下事件时发出最后一个窗口

pcww981p  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(321)

我有一个包含事件的流 Event(Id, Type, Date) 我想处理那些按(id,type)和活动会话分组的事件
例如,从事件

Event1: Event(1, T1, 2018-01-24)
Event2: Event(2, T1, 2018-01-26)
Event3: Event(1, T2, 2018-01-28)
Event4: Event(1, T2, 2018-01-28)
...

我希望有以下窗口:

Window1 with Event1
Window2 with Event2
Window3 with Event3 and Event4
...

根据我的理解,我应该能够在键控流上使用事件时间会话窗口来实现这一点。但是在我的代码中,只打印包含第一个事件(event1)的第一个窗口(window1)。

environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

environment
    .addSource(kafkaConsumer.setStartFromEarliest())
    .assignTimestampsAndWatermarks(<timestamp assigner>)
    .keyBy(e => (e.getId, e.getType))
    .window(EventTimeSessionWindows.withGap(Time.days(1)))
    .apply(new WindowFunction[Event, String, (String, String), TimeWindow]() {
          override def apply(key: (String, String), window: TimeWindow, input: Iterable[Event], out: Collector[String]): Unit = {
            var count = 0L
            for (in <- input) {
              count = count + 1
            }
            out.collect(s"Window $window count: $count")
          }
        })
  .print()

它是处理历史事件和会话窗口的适当方法吗?

tyg4sfes

tyg4sfes1#

org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows 是你想要的。不同于 EventTimeSessionWindows , ProcessingTimeSessionWindows 启动后立即触发 Gap 时光流逝,没有等待下一个事件的到来。

rqcrx0a6

rqcrx0a62#

在您的案例中,问题是水印总是基于传入事件生成的。如果没有传入事件,则水印不会进行。在您的示例中,只会发出window1,因为只有对于event1,后面还有另一个带有时间戳的事件,它将水印推进到会话间隔之外。对于其余三个事件,没有此类元素。对于事件3和事件4,根本没有此类事件。另外,因为流是键控的,所以具有不同键的元素是独立处理的。在这种情况下,水印不会前进,因此不会发出窗口。

相关问题