无法将特定于avro的记录反序列化为SpringCloudStream中的通用记录

fv2wmkja  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(710)

我希望有一个具有springcloudstream的通用使用者,我不需要在编译时指定avro消息的模式。由于schema id包含在消息中,因此应该可以使用schema id将其反序列化为对象或通用记录(这可以在spring云流框架之外轻松完成)。为此,我尝试使用与spring云流应用程序具有特定记录相同的方法。显然,这种方法不起作用,因为我需要在消费时指定记录类型。它引发以下异常:

Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class com.example.avro.model.InputModel specified in writer's schema whilst finding reader's schema for a SpecificRecord.

我想知道是否有一种方法可以在编译时不知道模式(和消息类型)的情况下使用来自主题的消息。
消费者的一个片段:

@StreamListener(Processor.INPUT)
public void handleMessage(Object message) {
...
}

p、 s:我在用 io.confluent.kafka.serializers.KafkaAvroDeserializer 对于值反序列化程序。

bogh5gae

bogh5gae1#

我也看到过类似的情况,在我的yml中,指定了Kafka夫罗德塞利泽

consumer-properties:
        key.deserializer:org.apache.kafka.common.serialization.StringDeserializer
        value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

然后在应用程序中,

@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public void hand(Object payload) throws IOException {
        GenericRecord record = (GenericRecord) payload;

}
jvidinwx

jvidinwx2#

我可以通过设置 spring.kafka.properties.specific.avro.reader= false 与处理程序的以下方法签名结合使用:

@StreamListener(Processor.INPUT)
public void handleMessage(GenericRecord record) {
...
}

相关问题