kafka控制台生产者和使用者的相关id错误

aiazj4mn  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(307)

我有两个Kafka由3个zk节点支持。我想通过在每个节点上本地运行kafka控制台producer和-consumer来测试kafka节点。
所以我用两个不同的终端ssh到我的一个kafka代理。在terminal#1中,我像这样运行消费者:

/opt/kafka/bin/kafka-console-consumer.sh --zookeeper a.b.c.d:2181 --topic test1

其中a.b.c.d是我的3个zk节点之一的私有ip。
然后在2号航站楼,我像这样管理制片人:

/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

我能够启动消费者和生产者都没有任何问题。
但是,在producer终端中,如果我通过输入一些文本(例如“hello”)并按enter键在test1主题中“激发”一条消息,我会立即看到:

[2017-01-17 19:45:57,353] WARN Error while fetching metadata with correlation id 0 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-01-17 19:45:57,372] WARN Error while fetching metadata with correlation id 1 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-01-17 19:45:57,477] WARN Error while fetching metadata with correlation id 2 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-01-17 19:45:57,582] WARN Error while fetching metadata with correlation id 3 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
...and it keeps going!

而且,在消费终端中,尽管我启动消费终端时没有收到任何错误,但大约30秒后,我会收到以下警告消息:

[2017-01-17 19:46:07,292] WARN Fetching topic metadata with correlation id 1 for topics [Set(test1)] from broker [BrokerEndPoint(1,ip-x-y-z-w.ec2.internal,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

有趣的是,ip-x-y-z-w.ec2.internal是另一个kafka代理的私有dns,所以这可能是代理间通信中的某种故障?
关于这里发生了什么以及我能做些什么来排除故障,有什么想法吗?

更新

这是我的全部 server.properties 两个kafkas节点的文件:

listeners=PLAINTEXT://0.0.0.0:9092
advertised.host.name=<private-aws-ec2-ip-addr>.ec2.internal
advertised.listeners=PLAINTEXT://0.0.0.0:9092
broker.id=1
port=9092
num.partitions=4
zookeeper.connect=zkA:2181,zkB:2181,zkC:2181
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.dirs=/tmp/kafka-logs
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
offset.metadata.max.bytes=4096

如果有什么东西看起来像臭味,请告诉我。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题