如何从python客户端向kafka发送json对象

x6h2sr28  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(792)

我有一个简单的json对象,如下所示

d = { 'tag ': 'blah',
  'name' : 'sam',
  'score': 
    {'row1': 100,
      'row2': 200
     }
}

下面是我的python代码,它向kafka发送消息

from kafka import SimpleProducer, KafkaClient
import json 

# To send messages synchronously

kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)

我在storm日志中看到消息正在被接收,但是它为元组{json structure in here}抛出的转换为空,不确定需要做什么来修复这个问题?。。

7nbnzgx9

7nbnzgx91#

Kafka期望值以字节为单位

b`some json message`

这里是我的简单Kafka生产者发送消息到Kafka服务器。

import json
from bson import json_util

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(10):
    data = { 'tag ': 'blah',
        'name' : 'sam',
        'index' : i,
        'score': 
            {'row1': 100,
             'row2': 200
        }
    }   
    producer.send('orders', json.dumps(data, default=json_util.default).encode('utf-8'))

这里,json.dumps()将json转换为字符串,encode('utf-8')将字符串转换为字节数组。

ohfgkhjo

ohfgkhjo2#

下面是我为Kafka制作的代码。我唯一不同的做法就是 yaml.safe_load 加载json内容。它以字符串而不是unicode返回内容。下面是代码段

with open('smaller_test_prod.txt') as f:
    for line in f:
        d = yaml.safe_load(line)
        jd = json.dumps(d)
        producer.send_messages(b'zeus_metrics',jd)

这里的每一行都是存储在文件中的json数据。

相关问题