在尝试用kafka流传输avro数据时,我遇到了以下错误: Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
尽管我在邮件列表中找到了几个关于它的旧线程,但是没有一个解决方案解决了这个问题。希望我能找到解决办法。
我的设置如下所示:
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)
我已经试过设置 KEY_SERDE
与 VALUE_SERDE
,但即使这在邮件列表中被“标记”为一个解决方案,在我的情况下也不起作用。
我正在生成 GenericData.Record
我的模式如下:
val record = new GenericData.Record(schema)
...
record.put(field, value)
当我启动调试模式并检查生成的记录时,一切看起来都很好,记录中有数据并且Map是正确的。
我像这样流化kstream(我以前使用过branch): splitTopics.get(0).to(s"${destTopic}_Testing")
我在用 GenericData.Record
记录在案。这可能是一个问题与 GenericAvroSerde
?
1条答案
按热度按时间mwecs4sa1#
我的问题的解决办法是交换
VALUE_SERDE
在反序列化从输入主题获得的字符串值之后。自
Serde
是序列化和反序列化的组合“元素”,我不能简单地使用StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
但必须使用StringSerde
用于反序列化输入记录,然后仅使用AvroSerde
将其写入输出主题。现在看起来像这样:
这样,它就像魅力。
谢谢您