我已经设置了一个kafka集群,其中kafka connect节点具有postgres的sink配置。
avro架构:
{
"namespace": "example.avro",
"type": "record",
"name": "topicname",
"fields": [
{"name": "deviceid", "type": "string"},
{"name": "longitude", "type": "float"},
{"name": "latitude", "type": "float"}
]
}
我发布数据的python代码是:
# Path to user.avsc avro schema
SCHEMA_PATH = "user.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())
writer = DatumWriter(SCHEMA)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({"deviceid":"9098", "latitude": 90.34 , "longitude": 334.4}, encoder)
raw_bytes = bytes_writer.getvalue()
PRODUCER.send_messages(TOPIC, raw_bytes)
我在kafka连接日志中收到以下错误:
org.apache.kafka.common.errors.serializationexception:错误
正在反序列化id为-1的avro消息\n使用者:org.apache.kafka.common.errors.serializationexception:未知的魔法字节\n,“id”:0,“worker\u id”:“0.0.0.0:8083”}],“type”:“sink”}
有什么问题吗?
或者,对于所提到的json数据,正确的avro方案应该是什么?
1条答案
按热度按时间oogrdqng1#
我对各种python客户机没有做太多的工作,但是这个magic byte错误几乎是肯定的,因为您发送的可能是有效的avro,但是如果您想与schema registry集成,负载需要采用不同的格式(额外的头信息,此处记录https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html 搜索wire格式或magic byte)。我个人会尝试使用confluent的python kafka客户端--https://github.com/confluentinc/confluent-kafka-python --它有使用avro和schema注册表的例子。