kafka最后一次偏移量在应用程序重启时增加

ukqbszuj  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(292)

我有一个javaakka应用程序,它从kafka读取、处理消息并手动提交。
我使用的是0.10.1.1api的高级使用者。
奇怪的是,当我关闭应用程序并再次启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。
代码中只有一个提交点。

else if(message.getClass() == ProcessedBatches.class) {
        try {
            Logger.getRootLogger().info("[" + this.name + "/Reader] Commiting ...");
            ProcessedBatches msg = (ProcessedBatches) message;
            consumer.commitSync(msg.getCommitInfo());
            lastCommitData = msg.getCommitInfo();
            lastCommit = System.currentTimeMillis();
        } catch (CommitFailedException e) {
            Logger.getRootLogger().info("[" + this.name + "/Reader] Failed to commit... Last commit: " + lastCommit + " | Last batch: " + lastBatch + ". Current uncommited messages: " + uncommitedMessages);
            self().tell(HarakiriMessage.getInstance(), self());
        }
    }

提交之后,我将偏移量hashmap保存在lastcommitdata中,以便对其进行调试。
然后我添加了一个shutdown钩子来打印lastcommitdata变量,以检查每个分区提交的最后一个偏移量是多少。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        String output = 
                "############## SHUTTING DOWN CONSUMER ############### \n" + 
                lastCommitData+"\n";
        System.out.println(output);
    }));

我还有一个consumerrebalancelistener,在consumer启动时检查每个分区的开始位置。

new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            for (TopicPartition p:collection
                 ) {
                System.out.println("Starting position "+p.toString()+":" + consumer.position(p));
            }
            coordinator.setRebalanceTimestamp(System.currentTimeMillis());
        }
    });

一个分区的示例:
停机前偏移量:3107169023
分配分区时的偏移量:3107180350
正如您所看到的,每封邮件之间几乎有10万条消息。
消费者属性如下:

Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("group.id", group_id);
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "100000000");
    props.put("session.timeout.ms", "10000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put("max.poll.records", "40000");
    props.put("auto.offset.reset", "latest");

我不确定我做错了什么。

9rnv2umw

9rnv2umw1#

检查我们主题的保留策略可能在这种情况下,当您重新启动使用者时,上次提交的偏移量可能已从分区中清除,使用者将前进到该分区的最新偏移量。

o8x7eapl

o8x7eapl2#

当您使用consumerapi轮询kafka时,它会读取分区中最后一次使用的偏移量。系统中必须有其他使用者,这些使用者必须获得您刚刚停止的示例先前使用过的分区—因此最新偏移量将发生更改。因为您知道退出前处于哪个偏移量,所以需要将其保存到某个耐用的存储区-使用 ConsumerRebalanceListener#onPartitionsRevoked 为了这个。当您重新启动消费者进程时读取该偏移量,并指示消费者从那里开始-通过调用 seek(partition, offset)ConsumerRebalanceListener#onPartitionsAssigned

uyto3xhc

uyto3xhc3#

我认为你假设的“关闭前偏移量:3107169023”基于你的关闭钩子打印的内容是正确的吗?
如果是,我看到两个潜在的问题。
当您注册关闭钩子时,您正在关闭lastcommitdata字段。
既然您是从另一个线程shutdown hook线程访问它,那么该字段是否声明为volatile?否则,您可能正在打印过时的值。
另外,java.lang.runtime.addshutdownhook说:
当虚拟机开始其关闭序列时,它将以某种未指定的顺序启动所有注册的关闭挂钩,并让它们同时运行
因此,不能保证在关闭挂钩已经打印了lastcommitdata值之后,您的使用者不会进一步提交偏移量。
我建议你检查Kafka,检查什么是你的应用程序关闭后,以确保实际提交的偏移量。

相关问题