json生产者

sczxawaw  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(380)

我尝试了一个简单的Kafka生产者第一次将采取从json文件记录的数据记录。但我犯了个错误。
我的json文件(test.json):

{
  "states": 
  [
    {
      "name": "Alabama",
      "abbreviation": "AL"
    },
    {
      "name": "Alaska",
      "abbreviation": "AK"
    }
  ]
}

我的制作人课程:

import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
print('Producer created..............')

with open('/home/ravi/test.json') as f:
    data = json.load(f)

    for state in data['states']:
        producer.send('ECJson', json.dump(state))

但我得到了一个错误:

Producer created..............
Traceback (most recent call last):
  File "prodECJson.py", line 10, in <module>
    producer.send('ECJson', json.dump(state))
TypeError: dump() missing 1 required positional argument: 'fp'
gpfsuwkq

gpfsuwkq1#

json.dump 将写入磁盘。你想要的是 json.dumps 将从给定对象创建一个字符串。kafkaproducer的send函数需要字节,因此也必须对字符串进行编码。也可以直接在kafkaproducer对象中指定序列化程序。
例如:

import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092', 
                         value_serializer=lambda m: json.dumps(m).encode("utf-8"))
print('Producer created..............')

with open('/home/ravi/test.json') as f:
    data = json.load(f)

    for state in data['states']:
        producer.send('ECJson', state)

相关问题