Kafka领导人选举导致Kafka流崩溃

nimxete2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(441)

我有一个kafka streams应用程序,它从一个kafka集群消费并产生一个kafka集群,该集群有3个代理和3个复制因子。除了使用者偏移主题(50个分区)之外,所有其他主题每个都只有一个分区。
当代理尝试首选副本选择时,streams应用程序(在与代理完全不同的示例上运行)失败,并出现以下错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ...
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

如果streams应用程序运行在不属于kafka集群的服务器上,那么它试图成为分区的领导者是正常的吗?
我可以根据需要通过以下方式重现这种行为:
杀死其中一个代理(因此,其他2个代理将接管所有分区的领导者,这些分区的领导者是被杀死的代理,如预期的那样)
把被杀的经纪人带回来
触发首选副本领导人选举 bin/kafka-preferred-replica-election.sh --zookeeper localhost 我的问题似乎类似于这个失败的报道,所以我想知道这是否是一个新的Kafka流错误。我的完整堆栈跟踪与报告的失败中链接的要点(这里)完全相同。
另一个可能有趣的细节是,在领导人选举期间,我在 controller.log 经纪人姓名:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

我最初以为是这个连接错误造成的,但在领导人选举使streams应用程序崩溃之后,如果我重新启动streams应用程序,它将正常工作,直到下一次选举,而我根本不会接触代理程序。
所有服务器(3个kafka代理和streams应用程序)都在ec2示例上运行。

ubbxdtey

ubbxdtey1#

这在0.10.2.1中已修复。如果您无法获取,请确保在streams配置中设置以下两个参数:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));

相关问题