Flink接线员

cnh2zyt3  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(329)

我正在阅读的源代码 CepOperator 来自flink cep,对以下代码段有疑问:

public void processElement(StreamRecord<IN> element) throws Exception {
        if (isProcessingTime) {
            ... ...
        } else {

            long timestamp = element.getTimestamp();
            IN value = element.getValue();

            if (timestamp > lastWatermark) {

                saveRegisterWatermarkTimer();

                bufferEvent(value, timestamp);

            } else if (lateDataOutputTag != null) {
                output.collect(lateDataOutputTag, element);
            } else {
                numLateRecordsDropped.inc();
            }
        }
    }

我不明白为什么每次收到新元素, saveRegisterWatermarkTimer() 接到电话了?以下是源代码:

private void saveRegisterWatermarkTimer() {
    long currentWatermark = timerService.currentWatermark();
    // protect against overflow
    if (currentWatermark + 1 > currentWatermark) {
        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
    }
}

它几乎总是注册一个新的事件时间计时器。它不会产生太多的计时器吗?
谢谢你的解释。

pnwntuvh

pnwntuvh1#

为注册计时器 currentWatermark + 1 有点习惯用法,可以在您希望知道每个到达的水印(或者换句话说,每次事件时钟提前)时使用。flink中的计时器是自动消除重复数据的:对于任何一对(密钥、时间戳)计时器,最多只能有一个计时器,因此不会产生太多计时器的风险。

相关问题