我的印象是,两个经纪人同步打开我的Kafka设置应该继续工作,即使其中一个经纪人失败。
为了测试它,我做了一个名为topicname的新主题。其说明如下:
Topic:topicname PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topicname Partition: 0 Leader: 0 Replicas: 0 Isr: 0
然后我按以下方式运行producer.sh和consumer.sh:
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9095 sync --topic topicname
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicname --from-beginning
在两个代理都工作之前,我看到消息被消费者正确地接收到,但是当我通过 kill
命令,然后消费者停止向我显示任何新消息。相反,它显示了以下错误消息:
WARN [ConsumerFetcherThread-console-consumer-57116_ip-<internalipvalue>-1438604886831-603de65b-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 865; ClientId: console-consumer-57116; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [topicname,0] -> PartitionFetchInfo(9,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread)
[2015-08-03 12:29:36,341] WARN Fetching topic metadata with correlation id 1 for topics [Set(topicname)] from broker [id:0,host:<hostname>,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
5条答案
按热度按时间hl0ma9xz1#
对于复制因子为n的主题,kafka最多可以容忍n-1个服务器故障。e、 有一个复制因子3将允许您处理多达2个服务器故障。
ssm49v7z2#
您可以在发布的主题描述中看到,您的主题只有一个副本。对于单个副本,没有容错性,如果代理0(包含副本的代理)消失,则主题将不可用。
创建一个包含更多副本的主题(使用
--replication-factor 3
)在发生碰撞时具有容错能力。p1tboqfb3#
我认为有两件事会让你的消费者在Kafkaha集群的经纪人破产后无法工作:
--对于您的主题,复制因子应该大于1。所以每个主题分区至少可以有一个备份。
Kafka配置的内部主题的复制因子也应大于1:
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
这两个修改使得我的生产者和消费者在代理关闭后仍然可以工作(5个代理和每个代理关闭一次)。
ivqmmu1c4#
即使在使用复制因子为2的主题时,我也遇到了同样的问题。为制作者设置以下属性对我很有用。”metadata.max.age.ms”(Kafka-0.8.2.1)
否则,我的制作人会在默认情况下等待1分钟,以获取新的领导并开始联系它
a64a0gku5#
我遇到了类似的问题,将producer config“topic.metadata.refresh.interval.ms”设置为-1(或者任何适合您的值)就解决了这个问题。所以在我的例子中,我有3个代理(在我的本地机器上设置了多个代理),并创建了带有3个分区和复制因子2的主题。
测试设置:
在生产者配置之前:
尝试运行3个代理,在生产者启动后杀死其中一个代理,本地zookeeper更新了isr和主题元数据信息(删除了作为领导者的代理),但生产者没有接收到它(可能是由于默认的10分钟刷新时间)。因此消息最终失败。我收到了例外。
在producer配置之后(在我的示例中为-1):
尝试运行3个代理,在生产者启动后杀死其中一个代理,本地zookeeper更新了isr信息(删除了作为领导者的代理),生产者刷新了新的isr/主题元数据信息,消息发送没有失败。
-1使其在每次失败尝试时刷新主题元数据,因此您可能希望将刷新时间缩短到合理的程度。