通过合并,事件时间窗口的结束时间戳不能早于当前水印

pxy2qtax  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(578)

我们已经创建了一个正在aws kinesis analytics中运行的流flink应用程序。它主要用于处理web点击流数据(页面视图、会话等)。我们有一个来自kinesis数据流的页面视图输入,该数据流被分割成键控窗口(由会话/设备令牌键控)。
应用程序在小规模下运行良好,但当按我们预期的正常生产吞吐量(每天约100万次页面浏览量)进行测试时,我们在合并窗口时会定期遇到错误:

“The end timestamp of an event-time window cannot become earlier than the current watermark by merging.”

这种不支持的操作异常正在使我们的应用程序崩溃,当它重新启动时,它会尝试一次又一次地处理同一个窗口并崩溃。我们已将此异常追溯到以下pr(https://github.com/apache/flink/pull/3587)但我们对如何处理这个案子有点不知所措。我们的主要目标是防止应用程序崩溃或以任何方式损坏应用程序的状态。
我们已经尝试改变maxoutoforderness,看看应用程序的行为是否不同,但是还没有找到一个不会发生错误的场景,除非我们将它设置为非常低的数字,比如1。

/Create input data streams from kinesis data streams
    DataStream<String> pvInput;

    if (env.getIsLocal()) {
        pvInput = createLocalDataStream(streamEnv, "pv-stream", env);
    } else {
        pvInput = createAwsDataStream(streamEnv, env.get("pv-stream"), env);
    }

    ObjectMapper mapper = new ObjectMapper();

/* SOURCES AND INITIAL MAPPING */

    //Turn pageview strings into pageview objects and assign timestamps
    DataStream<PageView> mappedPvs = pvInput
            .map(value -> mapper.readValue(value, PageView.class)).uid("pv_mapper").name("PV Mapper")
            .filter(value -> value.timestamp != null && value.uuid != null).uid("pv_filter").name("PV Filter")
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PageView>(Time.minutes(30)) {
                @Override
                public long extractTimestamp(PageView element) {
                    return element.timestamp.getTime();
                }
            }).uid("pv_timestamp_assigner").name("PV Timestamps");

/* SESSIONIZATION */

    //Key Pageviews by uuid for sessionization
    KeyedStream<PageView, String> keyedPvStream = mappedPvs
            .keyBy((KeySelector<PageView, String>) value -> value.uuid);

    long sessionWindow = 30L;

    //Window pageviews into sessions
    DataStream<PageViewAccumulator> sessionized = keyedPvStream
        .window(ActivitySessionAssigner.withGap(Time.minutes(sessionWindow)))
        .aggregate(new PageViewAggregateFunction()).uid("session_window").name("Session Window");

预期的结果是,合并窗口的结果永远不会导致结束时间戳早于当前水印。
实际结果是它们确实发生了,导致以下异常:

{
    "locationInformation": "org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1384)",
    "logger": "org.apache.flink.runtime.executiongraph.ExecutionGraph",
    "message": "Failure type is SYSTEM on RUNNING -> FAILING.",
    "throwableInformation": [
        "java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1555506438433 window: TimeWindow{start=1555455813013, end=1555457829192}",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:320)",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)",
        "\tat org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)",
        "\tat org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)",
        "\tat org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)",
        "\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)",
        "\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)",
        "\tat java.lang.Thread.run(Thread.java:748)"
    ],
    "threadName": "flink-akka.actor.default-dispatcher-16170",
    "applicationARN": "arn:aws:kinesisanalytics:us-xxx-x:XXXXXXXXXXXXX:application/XXXXX",
    "applicationVersionId": "6",
    "messageSchemaVersion": "1",
    "messageType": "INFO"
}
toe95027

toe950271#

这个问题可以通过使用下面的processfunction过滤延迟事件来解决。将此函数放在时间戳提取器和窗口函数之间可以删除任何延迟事件,从而消除发生此错误的可能性。

public class LateEventFilter extends ProcessFunction<PageView, PageView> {
    @Override
    public void processElement(PageView value, Context ctx, Collector<PageView> out) throws Exception {
        if(ctx.timestamp() > ctx.timerService().currentWatermark()){
            out.collect(value);
        }
    }
}

您还可以使用类似的函数将延迟事件输出到接收器,如下面的示例所示。

public class LateEventSideOutput extends ProcessFunction<PageView, PageView> {
    @Override
    public void processElement(PageView value, Context ctx, Collector<PageView> out) throws Exception {
        if(ctx.timestamp() <= ctx.timerService().currentWatermark()) {
            out.collect(value);
        }
    }
}

把它全部连接起来会像这样:

DataStream<PageView> lateFilteredPvs = mappedPvs.process(new LateEventFilter()).uid("late_pv_filter").name("LatePvFilter");

DataStream<PageView> latePvs = mappedPvs.process(new LateEventSideOutput()).uid("late_pv").name("LatePv");
                l 
latePvs.addSink(latePvSink).uid("late_pv_sink").name("LatePvSink");

相关问题