我有一个主题消费者碰巧处理慢了,被Kafka踢了一脚,导致
引发原因:org.apache.Kafka.clients.consumer.提交失败异常:无法完成偏移量提交,因为使用者不是自动分区分配的活动组的一部分;消费者很可能被踢出群。
虽然我知道原因和相关的配置选项(max.poll.interval.ms等),但我仍然不满意的是,在应用程序重新启动之前,消费者不会处理任何消息。更糟糕的是,在一段时间后,消费者组本身似乎被Kafka删除,因此当应用程序重新启动时,它已经丢失了偏移量。
当然,人们可以实施监控来识别滞后或丢失的消费者群体,但我更喜欢主动的、自我修复的实施,而不是依赖于坏事情已经发生时的警报。
1.有没有一种方法可以让消费者自动重新连接到服务器,而不需要重新启动整个应用程序?我错过了一些配置选项吗?
1.我尝试实现了一个Spring运行状况检查,因此如果消费者被踢出组,应用程序将报告“DOWN(所以至少Kubernetes可以自动重新启动),但是我在客户端没有检测到任何东西。我尝试注入KafkaListenerEndpointRegistry并迭代ListenerContainers。虽然其中一个被踢出了组,所有的容器都报告“running=true”,并且没有提供任何标志、属性或方法来指示错误。2有什么方法可以从spring Kafka客户端检测到断开连接的消费者吗?
1条答案
按热度按时间mftmpeh81#
您的第一个问题:
有没有一种方法可以让消费者自动重新连接到服务器,而不需要重新启动整个应用程序?我错过了一些配置选项吗?
Kafka brokers / clients(v2.3)中引入的静态成员资格选项将缓解此行为,您需要做的所有配置如下所示“从文档链接关于”
基本上,静态成员身份特性所做的是,每当消费者线程加入一个组时,它将获取一个
group.instance.id
,因此在此消费者示例中(线程)出于任何原因离开组,这将不会触发整个消费者组上的重新平衡,然而,连接到该(死的)消费者线程的分区将是空闲的,并且没有处理发生,直到该X1 M1 N1 X再次恢复活动。inter.broker.protocol.version
。ConsumerConfig#GROUP_INSTANCE_ID_CONFIG
设置为唯一值。ConsumerConfig#GROUP_INSTANCE_ID_CONFIG
就足够了,与示例使用的线程数无关。我建议阅读这篇article文章,以获得有关静态成员资格与动态成员资格的更多详细信息
对于您的第二个问题:
我尝试实现Spring运行状况检查,因此应用程序将报告
是的,如果您使用Kafka-Streams API,则可以将StateListener注册为以下示例