使用循环调用kakfa product类时,无法将消息写入kafka主题(producer)。
我对Python和Kafka很陌生。我正在尝试编写一个python程序,将消息写入kafka主题并生成,这样kafka消费者就可以订阅该主题来发布消息。
我不确定我的程序中缺少了什么,这限制了我将消息写入主题。
注意:我正在读取一个json文件并使用for循环来准备键值。然后将其赋给一个变量,并用kafka product with arg将该变量引用为msg。
附件是Kafka制片人计划。
输入:json\u smpl.json
文件内容:
{
"transaction":{
"Accnttype":"Saving"
,"Branch":"West"
,"id":"WS"
}
}
课程:
from confluent_kafka import Producer
import json
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {0}: {1}"
.format(msg.value(), err.str()))
else:
print("Message produced: {0}".format(msg.value()))
p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
with open('json_smpl.json') as read_j:
data = json.load(read_j)
get_data = data.get("transactions")
print(get_data)
for i in get_data:
a = list(get_data.items()[0])
p.produce(topic='mytopic12', 'myvalue #{0}'.format(a), callback=acked)
except KeyboardInterrupt:
pass
p.flush(1)
预期结果:对于循环中的每个迭代,将消息(json键和值)写入kafka主题。
实际结果:主题中没有消息。所以消费者没有收到任何信息。
1条答案
按热度按时间z5btuh9x1#
你的文件没有
transactions
键,并且没有循环,所以您的json没有被解析,您也没有捕获到keyerror或valueerror从这个开始