// Old code (modified to be an example):
val tenSecondGrouping: DataStream[MyCustomGrouping] = userIdsStream
.keyBy(_.somePartitionedKey)
.window(TumblingProcessingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
.trigger(ProcessingTimeTrigger.create())
.aggregate(new MyCustomAggregateFunc(new MyCustomGrouping()))
// New code, problem solved
...
.aggregate(new MyCustomAggregateFunc(() => new MyCustomGrouping()))
// passing in a func to create new object per trigger
3条答案
按热度按时间ckocjqey1#
您描述的功能可以在滚动窗口中找到:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#tumbling-Windows
更多的细节和/或代码会有所帮助:)
dwbf0jvd2#
你需要使用清除触发器。您需要的是fire和purge(发出并删除窗口内容),默认的flink触发器所做的是fire(发出并保留窗口内容)。
要获得更深入的解释,请查看触发器和fire与fire和purge的比较。
触发器确定窗口(由窗口赋值器形成)何时可以由窗口函数处理。每个windowassigner都有一个默认触发器。如果默认触发器不符合您的需要,可以使用触发器(…)指定自定义触发器。
当触发器触发时,它可以触发或触发\u和\u清除。当fire保留窗口的内容时,fire和purge会删除其内容。默认情况下,预实现的触发器只是简单地触发,而不清除窗口状态。
y1aodyip3#
我有点晚了,但我遇到了同样的问题与op的。后来我发现我自己的代码中有一个bug。我的错误可以作为你解决问题的参考。
错误发生在新的mycustomgrouping上:我无意中创建了一个单例mycustomgrouping对象,并在mycustomaggregatefunc中重用它。随着越来越多的滚动窗口的创建,后来的聚合结果变得疯狂!修复方法是在每次触发mycustomaggregatefunc时创建新的mycustomgrouping。所以: