我尝试了一个简单的Kafka生产者第一次将采取从json文件记录的数据记录。但我犯了个错误。
我的json文件(test.json):
{
"states":
[
{
"name": "Alabama",
"abbreviation": "AL"
},
{
"name": "Alaska",
"abbreviation": "AK"
}
]
}
我的制作人课程:
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
print('Producer created..............')
with open('/home/ravi/test.json') as f:
data = json.load(f)
for state in data['states']:
producer.send('ECJson', json.dump(state))
但我得到了一个错误:
Producer created..............
Traceback (most recent call last):
File "prodECJson.py", line 10, in <module>
producer.send('ECJson', json.dump(state))
TypeError: dump() missing 1 required positional argument: 'fp'
1条答案
按热度按时间gpfsuwkq1#
json.dump
将写入磁盘。你想要的是json.dumps
将从给定对象创建一个字符串。kafkaproducer的send函数需要字节,因此也必须对字符串进行编码。也可以直接在kafkaproducer对象中指定序列化程序。例如: