我正在阅读的源代码 CepOperator
来自flink cep,对以下代码段有疑问:
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
... ...
} else {
long timestamp = element.getTimestamp();
IN value = element.getValue();
if (timestamp > lastWatermark) {
saveRegisterWatermarkTimer();
bufferEvent(value, timestamp);
} else if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
} else {
numLateRecordsDropped.inc();
}
}
}
我不明白为什么每次收到新元素, saveRegisterWatermarkTimer()
接到电话了?以下是源代码:
private void saveRegisterWatermarkTimer() {
long currentWatermark = timerService.currentWatermark();
// protect against overflow
if (currentWatermark + 1 > currentWatermark) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
}
}
它几乎总是注册一个新的事件时间计时器。它不会产生太多的计时器吗?
谢谢你的解释。
1条答案
按热度按时间pnwntuvh1#
为注册计时器
currentWatermark + 1
有点习惯用法,可以在您希望知道每个到达的水印(或者换句话说,每次事件时钟提前)时使用。flink中的计时器是自动消除重复数据的:对于任何一对(密钥、时间戳)计时器,最多只能有一个计时器,因此不会产生太多计时器的风险。