why TumblingProcessingTimeWindows
assigns a window for every arrived element code as below? For example, a TimeWindow with starttime of 1s and endtime 5s, then all elements between the time are expected to one window, but from the code below, every element gets a new window, why?
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
}
WindowOperator invoke windowAssigner.assignWindows
for every element, why:
WindowOperator.java
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
}
1条答案
按热度按时间ktca8awb1#
这是实现方式的产物。
最重要的是窗口的内容是如何存储在状态后端的。Flink的状态后端是由三个部分组成的:(键、名称空间、值)。对于键控时间窗口,存储的内容是
TimeWindow
对象只是一个方便的 Package 器,用于保存每个窗口的标识信息,而不是用于存储分配给窗口的元素的容器。其中涉及的代码相当复杂,但是如果你想深入了解它的核心,你可以看看
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#processElement
(还有EvictingWindowOperator#processElement
,它们非常相似)。这些方法使用keyed state在窗口中存储每个传入的事件,如下所示:其中windowState为
InternalAppendingState
是ListState
的一个变体,它公开了名称空间(Flink的公共API不提供对该名称空间的访问)。