我对在一个社区中运行的nodejs和apache kafka中出现的protobuf序列化有一个问题。
我用googleprotobuf或protobufjs序列化数据,并用kafkajs将其发送给kafka。但是,当我提交数据时,kafka protobuf console consumer会给我一个序列化异常。请检查源代码并帮助我。https://github.com/smhmayboudi/kafka-protobuf-console
org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 53
Caused by: java.lang.IllegalArgumentException: Invalid message indexes: io.confluent.kafka.schemaregistry.protobuf.MessageIndexes@59d77850
at io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema.toMessageName(ProtobufSchema.java:903)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:119)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:98)
at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:130)
at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:104)
at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter.writeTo(ProtobufMessageFormatter.java:88)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:173)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:118)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
1条答案
按热度按时间jljoyd4f1#
我在杰拉德·克莱斯的帮助下,在合流的松弛中讨论,找到了解决办法。谢谢,杰拉德·克莱斯和里卡多·费雷拉。另外,我更新了git回购协议。
java和其他语言之间的序列化程序格式是不同的。因此,您必须遵循以下格式化样式:[magic byte]+[schema id]+[message index data]+[message payload],即消息索引数据为零。
资料来源:https://riferrei.com/2020/07/09/data-sharing-between-java-go-using-kafka-and-protobuf/