fetch offset 5705超出分区的范围,正在重置偏移量

eqzww0vc  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(567)

我每次在Kafka消费时都会收到下面的信息。

2020-07-04 14:54:27.640  INFO 1 --- [istener-0-0-C-1] c.n.o.c.h.p.n.PersistenceKafkaConsumer   : beginning to consume batch messages , Message Count :11
2020-07-04 14:54:27.809  INFO 1 --- [istener-0-0-C-1] c.n.o.c.h.p.n.PersistenceKafkaConsumer   : Execution Time :169
2020-07-04 14:54:27.809  INFO 1 --- [istener-0-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {nbi.cm.changes.mo.test23-1=OffsetAndMetadata{offset=5705, leaderEpoch=null, metadata=''}}
2020-07-04 14:54:27.812  INFO 1 --- [istener-0-0-C-1] c.n.o.c.h.p.n.PersistenceKafkaConsumer   : Acknowledgment Success
2020-07-04 14:54:27.813  INFO 1 --- [istener-0-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Fetch offset 5705 is out of range for partition nbi.cm.changes.mo.test23-1, resetting offset
2020-07-04 14:54:27.820  INFO 1 --- [istener-0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Resetting offset for partition nbi.cm.changes.mo.test23-1 to offset 666703.

调试日志中出现偏移量超出范围错误,正在重置为实际不存在的其他分区。在用户控制台中能够接收的所有消息都相同。
但实际上我之前只提交了offset,offset在kafka中是可用的,日志保留策略是24小时,所以在kafka中它不会被删除。
在调试日志中,我得到以下消息:

beginning to consume batch messages , Message Count :710
2020-07-02 04:58:31.486 DEBUG 1 --- [ce-notification] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Node 1002 sent an incremental fetch response for session 253529272 with 1 response partition(s)
2020-07-02 04:58:31.486 DEBUG 1 --- [ce-notification] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-1, groupId=cm-persistence-notification] Fetch READ_UNCOMMITTED at offset 11372 for partition nbi.cm.changes.mo.test12-1 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=0)

当所有的偏移量超出范围时。
侦听器类:

@KafkaListener( id = "batch-listener-0", topics = "topic1", groupId = "test", containerFactory = KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME )
    public void receive(
        @Payload List<String> messages,
        @Header( KafkaHeaders.RECEIVED_MESSAGE_KEY ) List<String> keys,
        @Header( KafkaHeaders.RECEIVED_PARTITION_ID ) List<Integer> partitions,
        @Header( KafkaHeaders.RECEIVED_TOPIC ) List<String> topics,
        @Header( KafkaHeaders.OFFSET ) List<Long> offsets,
        Acknowledgment ack )
    {
        long startTime = System.currentTimeMillis();

        handleNotifications( messages ); // will take more than 5s to process all messages

        long endTime = System.currentTimeMillis();

        long timeElapsed = endTime - startTime;

        LOGGER.info( "Execution Time :{}", timeElapsed );

        ack.acknowledge();

        LOGGER.info( "Acknowledgment Success" );

    }

我需要关闭消费者在这里,我想SpringKafka自动照顾那些,如果没有请你告诉如何关闭在一个Kafka,也如何检查是否发生了再平衡,因为在调试日志中无法看到任何日志相关的再平衡。

gcuhipw9

gcuhipw91#

我认为你的消费者可能正在重新平衡,因为你没有打电话 consumer.close() 在你的过程结束时。
这是一个猜测,但如果保留策略没有启动(并且日志没有被删除),这是我可以判断这种行为的唯一原因。
更新:
当你把他们设定为 @KafkaListeners ,你可以打电话给我 stop() 在Kafka的注册处: kafkaListenerEndpointRegistry.stop()

相关问题