使用python的avro库推送数据时发生kafka avro反序列化错误

oipij1gg  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(372)

我已经设置了一个kafka集群,其中kafka connect节点具有postgres的sink配置。
avro架构:

{
    "namespace": "example.avro",
    "type": "record",
    "name": "topicname",
    "fields": [
        {"name": "deviceid", "type": "string"},
        {"name": "longitude", "type": "float"},
        {"name": "latitude",  "type": "float"}
    ]
}

我发布数据的python代码是:


# Path to user.avsc avro schema

SCHEMA_PATH = "user.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

writer = DatumWriter(SCHEMA)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({"deviceid":"9098", "latitude":  90.34 , "longitude": 334.4}, encoder)
raw_bytes = bytes_writer.getvalue()
PRODUCER.send_messages(TOPIC, raw_bytes)

我在kafka连接日志中收到以下错误:
org.apache.kafka.common.errors.serializationexception:错误
正在反序列化id为-1的avro消息\n使用者:org.apache.kafka.common.errors.serializationexception:未知的魔法字节\n,“id”:0,“worker\u id”:“0.0.0.0:8083”}],“type”:“sink”}
有什么问题吗?
或者,对于所提到的json数据,正确的avro方案应该是什么?

oogrdqng

oogrdqng1#

我对各种python客户机没有做太多的工作,但是这个magic byte错误几乎是肯定的,因为您发送的可能是有效的avro,但是如果您想与schema registry集成,负载需要采用不同的格式(额外的头信息,此处记录https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html 搜索wire格式或magic byte)。我个人会尝试使用confluent的python kafka客户端--https://github.com/confluentinc/confluent-kafka-python --它有使用avro和schema注册表的例子。

相关问题