我们有一个Kafka消费者,突然(没有任何活动)进入重新平衡状态并卡住了。这导致k8 pod的CPU射击和GC时间也接近70- 80%。节点没有从该状态恢复。删除所有主题后,它几乎在4-5个小时后恢复。
Kafka版本- 2.1.1主题数量- 520(每个分区10个)消费者组- 1个分区分配策略-粘性
附上当时的一些信息日志。
2021-10-12 10:41:21
2021-10-12 05:11:21.160 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Member consumer-staging_notification_consumer-10-8a7eb399-c4e0-4443-b330-bfac42ca89ae sending LeaveGroup request to coordinator kafka5:9092 (id: 2147483642 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-10-12 05:10:27.340 INFO 6 CID:4967 UID: RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-10-12 10:40:27
2021-10-12 05:10:27.340 INFO 6 CID:4967 UID: RID:17c72e2d517-4d5e --- [ntainer#9-0-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-9, groupId=staging_notification_consumer] Lost previously assigned partitions staging_notification_topic_app_low_mobi_3119-4, staging_notification_topic_app_low_mobi_2023-4, staging_notification_topic_app_low_mobi_5170-9, staging_notification_topic_app_low_mobi_3540-9, staging_notification_topic_app_low_mobi_5722-9,
2021-10-12 10:40:37
2021-10-12 05:10:37.247 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:37
2021-10-12 05:10:37.183 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:57
2021-10-12 05:10:57.435 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-10, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:40:57
2021-10-12 05:10:57.435 INFO 6 CID: UID: RID: --- [cation_consumer] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Attempt to heartbeat failed since group is rebalancing
2021-10-12 10:41:56
2021-10-12 05:11:56.922 INFO 6 CID:4967 UID: RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Failing OffsetCommit request since the consumer is not part of an active group
2021-10-12 10:41:56
2021-10-12 05:11:56.923 ERROR 6 CID:4967 UID: RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
2021-10-12 10:41:56
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
2021-10-12 10:41:56
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
2021-10-12 10:41:56
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124)
2021-10-12 10:41:56
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2021-10-12 10:41:56
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2021-10-12 10:41:56
at java.lang.Thread.run(Thread.java:748)
2021-10-12 10:41:56
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2021-10-12 10:41:56
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
2021-10-12 10:41:56
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
2021-10-12 10:41:56
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149)
2021-10-12 10:41:56
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
2021-10-12 10:41:56
... 3 common frames omitted
2021-10-12 10:41:58
2021-10-12 05:11:58.442 INFO 6 CID:4967 UID: RID:17c72e2579c-22e6 --- [ntainer#9-3-C-1] s.consumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-staging_notification_consumer-12, groupId=staging_notification_consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
021-10-12 10:58:36
2021-10-12 05:28:36.039 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36
2021-10-12 05:28:36.044 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group
021-10-12 10:58:36
2021-10-12 05:28:36.039 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.
2021-10-12 10:58:36
2021-10-12 05:28:36.044 INFO 6 CID:4412 UID: RID:17c72ebadab-64d7 --- [ntainer#3-1-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-17, groupId=staging_notification_consumer] (Re-)joining group
2021-10-12 11:05:36
2021-10-12 05:35:36.955 INFO 6 CID:3435 UID: RID:17c72f29913-886c --- [ntainer#7-3-C-1] s.consumer.internals.AbstractCoordinator : [Consumer clientId=consumer-staging_notification_consumer-5, groupId=staging_notification_consumer] Join group failed with org.apache.kafka.common.errors.DisconnectException
字符串
这个问题在我删除主题时得到了解决。
类似的行为当我增加并发侦听器工厂中的线程数量时,消费者无法获得类似的不断重新平衡日志
1条答案
按热度按时间eimct9ow1#
当这种情况发生时,消费者客户端将主动向协调器发起LeaveGroup请求,以触发rebalance。
您可以通过增加max.poll.interval.ms或通过使用max.poll.records减少poll()中返回的批处理的最大大小来解决此问题。
当然,更好的方法是检查程序处理缓慢的原因并对其进行优化。