消息不是用kafka python传递的

agyaoht7  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(405)

我正在尝试用kafkapython设置一个简单的kafka应用程序。我一直在努力让我在网上找到的一些例子发挥作用,但似乎无法做到。我有一个kafka示例在docker容器中运行。我测试了shell工具,这个示例肯定能正常工作。我能够发送和接收信息。我怀疑制作人的留言超时了。下面是两个行为基本相同的代码版本:

import time
from kafka import SimpleProducer, KafkaClient

# connect to Kafka

kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)

# Assign a topic

topic = 'test'
producer.send_messages(topic, b'this is a message')

第二个版本:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['0.0.0.0:9092'], api_version=(0,10))
topic = "test"

producer.send(topic, b'test message')
2jcobegt

2jcobegt1#

这取决于你如何运行docker,但我相信你的问题是你试图连接到的主机名。您需要指向 ADVERTISED_HOST 环境变量。例如,当我以 docker run --hostname kafka-1 -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST='kafka-1' --env ADVERTISED_PORT=9092 spotify/kafka 我像这样向Kafka生产

from kafka import SimpleProducer, KafkaClient

kafka = KafkaClient('kafka-1:9092')
producer = SimpleProducer(kafka)
topic = 'test'
for i in range(100):
    producer.send_messages(topic, 'hullo-' + str(i))

另外我需要补充 127.0.0.1 kafka-1 给我的 /etc/hosts 文件。完成此操作后,我能够使用 bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test --from-beginning

gzszwxb4

gzszwxb42#

更改行: producer.send(topic, b'test message') 收件人: producer.send(topic, b'test message').get(timeout=30) (或您认为合适的任何值)
问题是,由于此方法是异步的,所以在发送消息之前会终止生产者。如果你加上:

import logging
logging.basicConfig(level=logging.INFO)

然后查看超时值为0。

相关问题