kafka使用者在mesos上运行“未能为分区添加引线”错误

5t7ly7z5  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(364)

我正在使用mesos/kafka库运行一个由6个代理组成的kafka集群。我能够在6台不同的机器上添加和启动代理,并使用pythonsimpleproducer和kafka-console-producer.sh脚本将消息发布到集群中。
然而,我不能让消费者正常工作。我正在运行以下使用者命令:

bin/kafka-console-consumer.sh --zookeeper 192.168.1.199:2181 --topic test --from-beginning --consumer.config config/consumer.properties --delete-consumer-offsets

在consumer.properties文件中,我将group.id设置为 my.group 并设置 zookeeeper.connect 到zookeeper集合中的多个节点。运行此消费者时,我收到以下警告消息:

[2015-09-24 16:01:06,609] WARN [my.group_my_host-1443106865779-b5a3a1e1-leader-finder-thread], Failed to add l
    eader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherM
    anager$LeaderFinderThread)
    java.nio.channels.ClosedChannelException
            at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
            at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
            at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
            at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
            at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
            at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
            at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    {'some':2}
    [2015-09-24 16:20:02,362] WARN [my.group_my_host-1443108001180-fa0c93e4-leader-finder-thread], Failed to add leader for partitions [test,4],[test,1],[test,5],[test,2],[test,0],[test,3]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
    java.nio.channels.ClosedChannelException
            at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
            at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
            at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
            at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
            at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:166)
            at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:177)
            at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:172)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:172)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:87)
            at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:77)
            at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
            at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
            at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:77)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
    ...
    // Lots more of this
    ...
    Consumed 1 messages

我不知道为什么它不能添加一个领导人,领导人似乎已经在zookeeper。除了所有这些错误信息外,我只能向消费者传递一条信息。绳子 {'some':2} 是我从游戏机制作人那里发来的消息。
我在文件中发现了这个错误 server.log 其中一个mesos奴隶,不确定是否相关:

[2015-09-24 17:09:41,926] ERROR Closing socket for /192.168.1.199 because of error (kafka.network.Processor)
java.io.IOException: Broken pipe
            at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
            at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
            at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
            at sun.nio.ch.IOUtil.write(IOUtil.java:65)
            at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
            at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
            at kafka.network.MultiSend.writeTo(Transmission.scala:101)
            at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
            at kafka.network.Processor.write(SocketServer.scala:472)
            at kafka.network.Processor.run(SocketServer.scala:342)
            at java.lang.Thread.run(Thread.java:745)

关于消费者可能发生的情况或我可能在哪里解决问题,有什么建议吗?
其中一个日志分区的zookeeper broker分区状态:

[zk: localhost:2181(CONNECTED) 166] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}

操作系统:ubuntu 14.0.4 mesos:0.23Kafka:2.10-0.8.2.1
更新:使用 kafka-console-consumer.sh 这些信息似乎正在传递。错误消息是恒定的,因此您不会看到中的所有消息 stdout . Python KafkaConsumer 立即失败 FailedPayloadsError .

chhqkbe1

chhqkbe11#

我认为你需要调查“adverted.host.name”这个属性的价值。我最近也遇到了这个问题,并修复了使用上述属性。
请确保您为每个代理提到了正确的ip地址。
如果不行就告诉我。

oknwwptz

oknwwptz2#

我的问题是:
运行zookeeper
已创建主题
跑Kafka
然后我得到“没有领导发现例外”
但是当我在zookeeper和kafka正常运行时创建一个主题时,它工作正常。

1rhkuytd

1rhkuytd3#

尝试运行以下命令:

bin/kafka-topics.sh --zookeeper your.zookeeper:2181 --describe --topic your_topic

这将显示哪个代理是每个主题分区的领导者(有关更多详细信息,请参阅此链接:http://kafka.apache.org/documentation.html#quickstart_multibroker)
在我的案例中,一个被设定为领导者的经纪人已经失败,不再存在。应该指派一位新的领导人,但由于某种原因没有指派。
我通过以下方法解决了这个问题:
阻止所有生产者和消费者
重新启动剩余的每个代理
然后我重新运行了 describe 命令,可以看到失败的代理不再被列为领导者。
然后我提出了一个与失败的代理具有相同id的新代理。Kafka从那里得到了它,并从我的其他经纪人那里带来了所有的数据(这要求你的主题有一个足够的复制因子)。一旦数据结束,kafka就让代理成为分区的领导者。
最后,我重新启动了生产者和消费者。

相关问题