我正在开发一个python微服务,它接收java微服务中生成的数据,并使用avro模式发送给kafka主题。我的问题是,我无法获得任何数据传输我的数据字段是原始的avro类型字节。在python端,我得到了一条被解析为正确模型的消息,但是bytearray字段总是空的,不管从java端发送什么。在java微服务中,发送的数据类型是java.nio.bytebuffer类型。
浮士德模型如下所示:
from faust import Record
class RoadagramPayload(Record, coerce=True):
roadagramPayload: bytearray
我使用以下代码设置了一个值序列化程序:
from schema_registry.client import SchemaRegistryClient, schema
from simple_settings import settings
client = SchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL)
schema = schema.AvroSchema(
{
"namespace": "com.zenuity.cloud.domain.roadobservation",
"type": "record",
"name": "RoadagramPayload",
"doc": "Avro schema for the roadagram payload",
"fields": [
{
"name": "roadagramPayload",
"type": "bytes",
"doc": "Mandatory. Roadagram payload."
}
]
}
)
avro_roadagram_serializer = FaustSerializer(client, "roadagram", schema)
最后,我使用以下代码从kafka主题接收数据:
from example.app import app
from example.codecs.avro import avro_roadagram_serializer
from roadobservation import RoadagramPayload
roadagram_topic = app.topic('zc.roadagram.ingress', partitions=1,
value_type=RoadagramPayload.RoadagramPayload,
value_serializer=avro_roadagram_serializer)
@app.agent(roadagram_topic)
async def recieve_roadagram(roadagrams):
async for roadagram in roadagrams:
logger.info(f"Roadagram: {roadagram}")
记录器打印的输出是(空负载):roadagram:roadagrampayload:roadagrampayload=b',无论我从java端发送什么。
我的猜测是我在浮士德模型中使用了错误的类型,我尝试了bytes和bytearray,得到了相同的结果。使用str还会产生一个空字符串,并且在我尝试将内容发送回另一个主题时也会出现错误。
暂无答案!
目前还没有任何答案,快来回答吧!