创建主题,但在kubernetes上使用python获取kafka失败的payloadsError

qxgroojn  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(386)

我使用的是python-kafka库中的simpleproducer。这个脚本在我之前尝试过的其他更硬配置的Kafka设置中工作得非常完美。

kafka = KafkaClient(u'[masterNodeIp]:[servicePort]')
producer = SimpleProducer(kafka)

# make a simple message, while true run

producer.send_messages(b'oneMoreTopic', sentence)

在运行这个脚本一次之后,我在python控制台中得到了这个响应。

kafka.common.LeaderNotAvailableError: TopicMetadata(topic='oneMoreTopic', error=5, partitions=[])

然后我可以进入zookeeper.log上的节点并查看:

2015-09-14 12:16:32,276 - INFO  [ProcessThread(sid:3  cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:setData cxid:0x71 zxid:0x1000000d8 txntype:-1 reqpath:n/a Error Path:/config/topics/oneMoreTopic Error:KeeperErrorCode = NoNode for /config/topics/oneMoreTopic
2015-09-14 12:16:32,278 - INFO  [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x72 zxid:0x1000000d9 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
2015-09-14 12:16:32,302 - INFO  [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x7b zxid:0x1000000dc txntype:-1 reqpath:n/a Error Path:/brokers/topics/oneMoreTopic/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/oneMoreTopic/partitions/0
2015-09-14 12:16:32,304 - INFO  [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when processing sessionid:0x34fcb982d030000 type:create cxid:0x7c zxid:0x1000000dd txntype:-1 reqpath:n/a Error Path:/brokers/topics/oneMoreTopic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/oneMoreTopic/partitions

这似乎只是Zookeeper为主题创建了一个新的znode,因为它从以前就不存在了。kafka server.log打印:

[2015-09-14 12:16:32,282] INFO Topic creation {"version":1,"partitions":{"0":[10200119]}} (kafka.admin.AdminUtils$)
[2015-09-14 12:16:32,287] INFO [KafkaApi-10200219] Auto creation of topic oneMoreTopic with 1 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
[2015-09-14 12:16:51,579] INFO Closing socket connection to /10.240.1.94. (kafka.network.Processor)

但是,我的消息从未发布到主题,下次运行python脚本时,我总是得到:

kafka.common.FailedPayloadsError

在我使其工作的情况下,advised.host.name始终是节点的外部ip,但我似乎无法通过kubernetes使其工作。是否可以从容器parhaps调用外部ip?
对于所有代理程序,我的kafka/config/server.properties如下所示:

broker.id=10200121
host.name=kafka-f8p06
advertised.host.name=kafka-f8p06
++
yb3bgrhw

yb3bgrhw1#

我的问题的根本原因是通过确保我的所有节点都可以通过

sudo iptables -t nat -A POSTROUTING ! -d 10.0.0.0/8 -o ens4v1 -j MASQUERADE

在此之后,我将advised.host.name更改为尝试从中访问我的代理的外部ip地址。因此,如果我的gce节点位于156.99.33.101上,并且在33777上为我的Kafka服务打开了一个节点端口,我会:

advertised.host.name=156.99.33.101
advertised.host.port=33777

当其中一个代理收到请求时,kafka使用advised.host.name连接回自身,因此使用外部地址至少可以访问它。虽然我不确定是否有任何后果,以指导它以外的本地地址空间。
关于leadernotavailableexception的主题

n9vozmp4

n9vozmp42#

broker.id=10200121
host.name=kafka-f8p06  <----- use IP here
advertised.host.name=kafka-f8p06  <---- use IP here

我想你应该给我买ips host.name 以及 advertised.host.name 因为k8s不是通过主机名解析pods,而是通过ip解析pods。
你的Kafka节点可能无法以这种方式相互交谈,也找不到领导者。

相关问题