问题:flink应用程序没有接收和处理kinesis连接器关闭时(由于重新启动)生成的事件
我们有下面的flink env设置
env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause);
env.getCheckpointConfig().setCheckpointTimeout(timeOut);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
动觉具有以下初始形态
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
有趣的是,当我改变动觉配置来回复事件时,例如。
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
flink正在从kinesis接收所有缓冲记录(包括flink应用程序关闭之前、期间和之后生成的事件),并对其进行处理。因此,这种行为违反了flink应用程序的“恰好一次”属性。
有人能指出我遗漏的一些明显的东西吗?
1条答案
按热度按时间bogh5gae1#
flink kinesis连接器确实将碎片序列号存储在状态中,以便只进行一次处理。
根据您的描述,在您的作业“restart”中,检查点状态似乎不受尊重。
首先要消除一个显而易见的问题:您的作业是如何从重启中恢复的?您是从保存点恢复,还是从以前的检查点自动重新启动?