python—尝试为每次迭代生成kafka主题的消息,但看起来我最终没有向消费者发送消息

wtzytmuj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(246)

使用循环调用kakfa product类时,无法将消息写入kafka主题(producer)。
我对Python和Kafka很陌生。我正在尝试编写一个python程序,将消息写入kafka主题并生成,这样kafka消费者就可以订阅该主题来发布消息。
我不确定我的程序中缺少了什么,这限制了我将消息写入主题。
注意:我正在读取一个json文件并使用for循环来准备键值。然后将其赋给一个变量,并用kafka product with arg将该变量引用为msg。
附件是Kafka制片人计划。
输入:json\u smpl.json
文件内容:

{
"transaction":{
"Accnttype":"Saving"
,"Branch":"West"
,"id":"WS"
}
}

课程:

from confluent_kafka import Producer
import json

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {0}: {1}"
              .format(msg.value(), err.str()))
    else:
        print("Message produced: {0}".format(msg.value()))

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j)
        get_data = data.get("transactions")
    print(get_data)
    for i in get_data:
        a = list(get_data.items()[0])
        p.produce(topic='mytopic12', 'myvalue #{0}'.format(a), callback=acked)
except KeyboardInterrupt:
    pass
p.flush(1)

预期结果:对于循环中的每个迭代,将消息(json键和值)写入kafka主题。
实际结果:主题中没有消息。所以消费者没有收到任何信息。

z5btuh9x

z5btuh9x1#

你的文件没有 transactions 键,并且没有循环,所以您的json没有被解析,您也没有捕获到keyerror或valueerror
从这个开始

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j).get("transaction")
        tosend = json.dumps(data)
        print("Ready to send : {}".format(tosend))
        p.produce(topic='mytopic12', tosend, callback=acked)
except:
    print("There was some error")

相关问题