Kafka消费者陷入再平衡状态

vsnjm48y  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(158)

我们有一个Kafka消费者,突然(没有任何活动)进入重新平衡状态并卡住了。这导致k8 pod的CPU射击和GC时间也接近70- 80%。节点没有从该状态恢复。删除所有主题后,它几乎在4-5个小时后恢复。
Kafka版本- 2.1.1主题数量- 520(每个分区10个)消费者组- 1个分区分配策略-粘性
附上当时的一些信息日志。

2021-10-12 10:41:21 
2021-10-12 05:11:21.160   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Member consumer-staging_notification_consumer-10-8a7eb399-c4e0-4443-b330-bfac42ca89ae sending LeaveGroup request to coordinator kafka5:9092 (id: 2147483642 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

2021-10-12 05:10:27.340   INFO 6 CID:4967  UID:  RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group

2021-10-12 10:40:27 
2021-10-12 05:10:27.340   INFO 6 CID:4967  UID:  RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Lost previously assigned partitions staging_notification_topic_app_low_mobi_3119-4, staging_notification_topic_app_low_mobi_2023-4, staging_notification_topic_app_low_mobi_5170-9, staging_notification_topic_app_low_mobi_3540-9, staging_notification_topic_app_low_mobi_5722-9,

2021-10-12 10:40:37 
2021-10-12 05:10:37.247   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:37 
2021-10-12 05:10:37.183   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing

2021-10-12 10:40:57 
2021-10-12 05:10:57.435   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:57 
2021-10-12 05:10:57.435   INFO 6 CID:  UID:  RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing


2021-10-12 10:41:56 
2021-10-12 05:11:56.922   INFO 6 CID:4967  UID:  RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Failing OffsetCommit request since the consumer is not part of an active group
2021-10-12 10:41:56 
2021-10-12 05:11:56.923  ERROR 6 CID:4967  UID:  RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
2021-10-12 10:41:56 
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124)
2021-10-12 10:41:56 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2021-10-12 10:41:56 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2021-10-12 10:41:56 
    at java.lang.Thread.run(Thread.java:748)
2021-10-12 10:41:56 
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2021-10-12 10:41:56 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
2021-10-12 10:41:56 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
2021-10-12 10:41:56 
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149)
2021-10-12 10:41:56 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
2021-10-12 10:41:56 
    ... 3 common frames omitted

2021-10-12 10:41:58 
2021-10-12 05:11:58.442   INFO 6 CID:4967  UID:  RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group

021-10-12 10:58:36  
2021-10-12 05:28:36.039   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36 
2021-10-12 05:28:36.044   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group

021-10-12 10:58:36  
2021-10-12 05:28:36.039   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36 
2021-10-12 05:28:36.044   INFO 6 CID:4412  UID:  RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group



2021-10-12 11:05:36 
2021-10-12 05:35:36.955   INFO 6 CID:3435  UID:  RID:17c72f29913-886c --- [ntainer#7-3-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-5, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.DisconnectException

字符串
这个问题在我删除主题时得到了解决。

类似的行为当我增加并发侦听器工厂中的线程数量时,消费者无法获得类似的不断重新平衡日志

eimct9ow

eimct9ow1#

    • 第一个日志表示后续调用poll()之间的时间间隔长于配置的max.poll.interval.ms**(默认为5分钟)。

当这种情况发生时,消费者客户端将主动向协调器发起LeaveGroup请求,以触发rebalance
您可以通过增加max.poll.interval.ms或通过使用max.poll.records减少poll()中返回的批处理的最大大小来解决此问题。
当然,更好的方法是检查程序处理缓慢的原因并对其进行优化。

相关问题