flink流窗口触发器

8e2ybdfx  于 2021-06-25  发布在  Flink
关注(0)|答案(3)|浏览(516)

我有Flink流,我正在计算一些东西在某个时间窗口说30秒。
这里发生了什么,它是给我的结果,我的聚合以前的窗口以及。
假设前30秒我得到结果10。
接下来的三十秒我想要一个新的结果,取而代之的是最后一个窗口结果+新的,以此类推。
所以我的问题是我如何得到每个窗口的新结果。

ckocjqey

ckocjqey1#

您描述的功能可以在滚动窗口中找到:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#tumbling-Windows
更多的细节和/或代码会有所帮助:)

dwbf0jvd

dwbf0jvd2#

你需要使用清除触发器。您需要的是fire和purge(发出并删除窗口内容),默认的flink触发器所做的是fire(发出并保留窗口内容)。

input
    .keyBy(...)
    .timeWindow(Time.seconds(30))

    // The important part: Replace the default non-purging ProcessingTimeTrigger
    .trigger(new PurgingTrigger[..., TimeWindow](ProcessingTimeTrigger))

    .reduce(...)

要获得更深入的解释,请查看触发器和fire与fire和purge的比较。
触发器确定窗口(由窗口赋值器形成)何时可以由窗口函数处理。每个windowassigner都有一个默认触发器。如果默认触发器不符合您的需要,可以使用触发器(…)指定自定义触发器。
当触发器触发时,它可以触发或触发\u和\u清除。当fire保留窗口的内容时,fire和purge会删除其内容。默认情况下,预实现的触发器只是简单地触发,而不清除窗口状态。

y1aodyip

y1aodyip3#

我有点晚了,但我遇到了同样的问题与op的。后来我发现我自己的代码中有一个bug。我的错误可以作为你解决问题的参考。

// Old code (modified to be an example):
val tenSecondGrouping: DataStream[MyCustomGrouping] = userIdsStream
      .keyBy(_.somePartitionedKey)
      .window(TumblingProcessingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
      .trigger(ProcessingTimeTrigger.create())
      .aggregate(new MyCustomAggregateFunc(new MyCustomGrouping()))

错误发生在新的mycustomgrouping上:我无意中创建了一个单例mycustomgrouping对象,并在mycustomaggregatefunc中重用它。随着越来越多的滚动窗口的创建,后来的聚合结果变得疯狂!修复方法是在每次触发mycustomaggregatefunc时创建新的mycustomgrouping。所以:

// New code, problem solved
          ...
          .aggregate(new MyCustomAggregateFunc(() => new MyCustomGrouping())) 
// passing in a func to create new object per trigger

相关问题