kafka streams-genericavroserde上的未知魔法字节

6ljaweal  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(365)

在尝试用kafka流传输avro数据时,我遇到了以下错误: Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! 尽管我在邮件列表中找到了几个关于它的旧线程,但是没有一个解决方案解决了这个问题。希望我能找到解决办法。
我的设置如下所示:

StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]   
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)

我已经试过设置 KEY_SERDEVALUE_SERDE ,但即使这在邮件列表中被“标记”为一个解决方案,在我的情况下也不起作用。
我正在生成 GenericData.Record 我的模式如下:

val record = new GenericData.Record(schema)
...
record.put(field, value)

当我启动调试模式并检查生成的记录时,一切看起来都很好,记录中有数据并且Map是正确的。
我像这样流化kstream(我以前使用过branch): splitTopics.get(0).to(s"${destTopic}_Testing") 我在用 GenericData.Record 记录在案。这可能是一个问题与 GenericAvroSerde ?

mwecs4sa

mwecs4sa1#

我的问题的解决办法是交换 VALUE_SERDE 在反序列化从输入主题获得的字符串值之后。
Serde 是序列化和反序列化的组合“元素”,我不能简单地使用 StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde] 但必须使用 StringSerde 用于反序列化输入记录,然后仅使用 AvroSerde 将其写入输出主题。
现在看起来像这样:

// default streams configuration serdes are different from the actual output configurations
val streamsConfiguration: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID"))
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG"))
  p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG"))
  p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG"))
  p
}

// adjusted output serdes for avro records
val keySerde: Serde[String] = Serdes.String
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde()
valSerde.configure(
  Collections.singletonMap(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
    streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)
  ),
  /* isKeySerde = */ false
)

// Now using the adjusted serdes to write to output like this
stream.to(keySerde, valSerde, "destTopic")

这样,它就像魅力。
谢谢您

相关问题