通过kafka发送包含多个json对象的json文件

tktrz96b  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(935)

我有一个包含以下格式的多个json文档的文件。 {"attribute1": "value1", "attribute2": "value2", "attribute3": "value3", "attribute4": "value4"} {"attribute1": "value11", "attribute2": "value12", "attribute3": "value13", "attribute4": "value14"} {"attribute1": "value21", "attribute22": "value2", "attribute23": "value3", "attribute4": "value24"} 我正在尝试将单独的json文档发送给kafka。脚本以退出代码0执行,但我看不到来自kafka消费者的消息。我不知道我哪里出错了。
我的代码如下:

import csv
import json

bootstrap = ['hostname:9092']
valueSerializer = lambda x: dumps(x).encode('utf-8')

producer = KafkaProducer(bootstrap_servers = bootstrap, value_serializer = valueSerializer)

table = []
with open('~/json_file_name.json', 'r') as json_file:
    for line in json_file:
        table.append(json.loads(line))

# numrows = len(table)

# print(numrows)

for row in table:
    print(row)
    producer.send('Topic_Name', value=row)
ndasle7k

ndasle7k1#

很可能您没有为生产商发送足够的数据来刷新其批处理。您尚未显示kafkaproducer的导入,但请查看是否可以 producer.flush() 在剧本的结尾
您不需要表变量,顺便说一下,只要在读取文件行时发送即可。你也不需要 dumps(x) 因为您发送的字符串是 json.loads 已经
也可以删除csv导入

相关问题