我有一个json对象,它有不同的顺序状态。我想把这个推给Apache·Kafka。序列是至关重要的,因为我已经设置了消费数据的消费者。我计划使用order id作为消息的键,因为它将按顺序存储在同一分区中。
data = [{"id": 1, "orderId": "A123","status":"PLACED"},{"id": 2, "orderId": "B123","status":"PLACED"}
{"id": 3, "orderId": "A123","status":"DISPATCHED"}]
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=os.environ['KAFKA_BROKER_URLS'],security_protocol="SSL")
for item in data:
d = json.dumps(item)
future = producer.send(topic,key=item['orderId'], value=d)
但我在制作人那里犯了一个错误。我甚至试过
d = json.dumps(item).encode('utf-8')
future = producer.send(topic,key=item['orderId'], value=d)
通过在初始化代码中定义值序列化程序
producer = KafkaProducer(bootstrap_servers=os.environ['KAFKA_BROKER_URLS'],security_protocol="SSL",value_serializer=lambda v: json.dumps(v).encode('utf-8'))
他们都给出了相同的错误。如果我移除密钥并像这样传递数据
d = json.dumps(item).encode('utf-8')
future = producer.send(topic,d)
这是工作和信息被推到Kafka和显示在消费者方面。但我想传递一个密钥,用于维护相同订单id的序列。我该如何解决它?
更新1:
错误如下:
{
"errorType": "AssertionError",
"stackTrace": [
" File \"/var/task/producer.py\", line 56, future = producer.send(topic,key=item['orderId'], value=d)\n",
" File \"/opt/python/lib/python3.7/site-packages/kafka/producer/kafka.py\", line 572, in send\n assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))\n"
]
}
暂无答案!
目前还没有任何答案,快来回答吧!