Flink WindowAssigner是否为每个到达的元素分配一个窗口?

hi3rlvi2  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(134)

why TumblingProcessingTimeWindows assigns a window for every arrived element code as below? For example, a TimeWindow with starttime of 1s and endtime 5s, then all elements between the time are expected to one window, but from the code below, every element gets a new window, why?

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        final long now = context.getCurrentProcessingTime();
        long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    }
}

WindowOperator invoke windowAssigner.assignWindows for every element, why:

WindowOperator.java
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
}
ktca8awb

ktca8awb1#

这是实现方式的产物。
最重要的是窗口的内容是如何存储在状态后端的。Flink的状态后端是由三个部分组成的:(键、名称空间、值)。对于键控时间窗口,存储的内容是

  • key:关键字
    • 命名空间 *:时间窗口的副本(即其类、开始和结束)
  • value:分配给此窗格的元素列表

TimeWindow对象只是一个方便的 Package 器,用于保存每个窗口的标识信息,而不是用于存储分配给窗口的元素的容器。
其中涉及的代码相当复杂,但是如果你想深入了解它的核心,你可以看看org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#processElement(还有EvictingWindowOperator#processElement,它们非常相似)。这些方法使用keyed state在窗口中存储每个传入的事件,如下所示:

windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());

其中windowState为

/** The state in which the window contents is stored. Each window is a namespace */
private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;

InternalAppendingStateListState的一个变体,它公开了名称空间(Flink的公共API不提供对该名称空间的访问)。

相关问题