我正在尝试将protobuf消息转换为Avro记录,以便使用KafkaProducer将其发送到Kafka主题。我知道有一个用于值.serializer的KafkaProtbufSerializer,但我想将protobuf消息转换为Avro以使用KafkaAvroSerializer。我该如何操作?
dced5bon1#
使用值为.*de a的使用者的KafkaProtobuf反序列化器,然后构造一个新的POJO,用于生产者的KafkaAvroSerializer。
db2dz4w82#
你也许可以尝试这样的方法,如果你有Protobuf模式和等价的Avro模式,以及用支持反射的语言从两者生成的代码,你可以用它来自动将GPB对象的字段转换成Avro对象。字段名和结构需要匹配(或以某种可预测的方式紧密匹配,例如大写或小写)。其思想是,您有一个函数,该函数循环遍历GPB对象中的每个字段,并使用字段名/路径来指向Avro对象中的等效字段,然后进行赋值:
// Psuedocode foreach (ValueMember gpbObjField in gpbObj) { avroObj.FieldByName(gpbObjField.Name).Value = gpbObjField.Value }
并让它递归地调用自己的非原语类型(例如嵌套消息)。或者类似的东西。我在C#中做过类似的事情。这样做的好处是可以防止模式的更改,这会很方便,否则,如果您手写代码对整个对象执行objA.Field = objB.Field,维护起来会很痛苦。
2条答案
按热度按时间dced5bon1#
使用值为.*de a的使用者的KafkaProtobuf反序列化器,然后构造一个新的POJO,用于生产者的KafkaAvroSerializer。
db2dz4w82#
你也许可以尝试这样的方法,如果你有Protobuf模式和等价的Avro模式,以及用支持反射的语言从两者生成的代码,你可以用它来自动将GPB对象的字段转换成Avro对象。
字段名和结构需要匹配(或以某种可预测的方式紧密匹配,例如大写或小写)。
其思想是,您有一个函数,该函数循环遍历GPB对象中的每个字段,并使用字段名/路径来指向Avro对象中的等效字段,然后进行赋值:
并让它递归地调用自己的非原语类型(例如嵌套消息)。或者类似的东西。我在C#中做过类似的事情。
这样做的好处是可以防止模式的更改,这会很方便,否则,如果您手写代码对整个对象执行objA.Field = objB.Field,维护起来会很痛苦。