java 如何正确处理CommitFailedException:“......很可能是消费者被踢出群了,"?

rekjcdws  于 2023-03-11  发布在  Java
关注(0)|答案(1)|浏览(143)

我有一个主题消费者碰巧处理慢了,被Kafka踢了一脚,导致
引发原因:org.apache.Kafka.clients.consumer.提交失败异常:无法完成偏移量提交,因为使用者不是自动分区分配的活动组的一部分;消费者很可能被踢出群。
虽然我知道原因和相关的配置选项(max.poll.interval.ms等),但我仍然不满意的是,在应用程序重新启动之前,消费者不会处理任何消息。更糟糕的是,在一段时间后,消费者组本身似乎被Kafka删除,因此当应用程序重新启动时,它已经丢失了偏移量。
当然,人们可以实施监控来识别滞后或丢失的消费者群体,但我更喜欢主动的、自我修复的实施,而不是依赖于坏事情已经发生时的警报。
1.有没有一种方法可以让消费者自动重新连接到服务器,而不需要重新启动整个应用程序?我错过了一些配置选项吗?
1.我尝试实现了一个Spring运行状况检查,因此如果消费者被踢出组,应用程序将报告“DOWN(所以至少Kubernetes可以自动重新启动),但是我在客户端没有检测到任何东西。我尝试注入KafkaListenerEndpointRegistry并迭代ListenerContainers。虽然其中一个被踢出了组,所有的容器都报告“running=true”,并且没有提供任何标志、属性或方法来指示错误。2有什么方法可以从spring Kafka客户端检测到断开连接的消费者吗?

mftmpeh8

mftmpeh81#

您的第一个问题:

有没有一种方法可以让消费者自动重新连接到服务器,而不需要重新启动整个应用程序?我错过了一些配置选项吗?
Kafka brokers / clients(v2.3)中引入的静态成员资格选项将缓解此行为,您需要做的所有配置如下所示“从文档链接关于”
基本上,静态成员身份特性所做的是,每当消费者线程加入一个组时,它将获取一个group.instance.id,因此在此消费者示例中(线程)出于任何原因离开组,这将不会触发整个消费者组上的重新平衡,然而,连接到该(死的)消费者线程的分区将是空闲的,并且没有处理发生,直到该X1 M1 N1 X再次恢复活动。

  • 将代理集群和客户端应用程序都升级到2.3或更高版本,同时确保升级后的代理使用的是2.3或更高版本的inter.broker.protocol.version
  • 对于一个组下的每个使用者示例,将config ConsumerConfig#GROUP_INSTANCE_ID_CONFIG设置为唯一值。
  • 对于Kafka Streams应用程序,为每个KafkaStreams示例设置唯一的ConsumerConfig#GROUP_INSTANCE_ID_CONFIG就足够了,与示例使用的线程数无关。

我建议阅读这篇article文章,以获得有关静态成员资格与动态成员资格的更多详细信息

对于您的第二个问题:

我尝试实现Spring运行状况检查,因此应用程序将报告
是的,如果您使用Kafka-Streams API,则可以将StateListener注册为以下示例

streams.setStateListener((KafkaStreams.State newState, KafkaStreams.State old)->{
                logger.info()
                        .message("Thread state has changed from {" + old.name() + "} to {" + newState.name() + "}")
                        .log();
                // Here you can set any global state that you can expose via api and enquire on it using K8S health check.
                }
            });

相关问题