spring云kafka和avro序列化问题

gywdnpxw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(488)

我用 spring-cloud-stream-schema 阅读Kafka的avro信息。我在中配置了输入通道 MessagesChannels :

@Input("topicName1")
SubscribableChannel fromInput1();

我有这样的配置文件:

@Configuration
@EnableBinding(MessagesChannels.class)
@EnableSchemaRegistryClient
public class MessageConfiguration {
  @Bean
  public MessageConverter topic1MessageConverter() throws IOException {
    return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

我的消费者被打电话给

fromInput1().subscribe(this::onMessage);

void onMessage(Message message) {
}

当我实际发送消息时,我得到了以下错误:

nested exception is java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to [B

实际上,原始字节被正确地解析为 org.apache.avro.generic.GenericData$Record . 但Spring需要 Message 班级。如何铸造 GenericData$RecordMessage 或者怎么投 GenericData$Record 直接由avro工具类生成?
更多详细信息:

2017-03-06 11:23:10.695 ERROR 19690 --- [afka-listener-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = topic1, partition = 0, offset = 7979, CreateTime = 1488784987569, checksum = 623709057, serialized key size = -1, serialized value size = 36, key = null, value = {"foor": "bar"})

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$ReceivingHandler@4bf9d802]; nested exception is java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to [B
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70)
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64)
ntjbwcob

ntjbwcob1#

我想你需要设定 contentType 对于要使用的传入消息通道 application/*+avro 如此处所述

相关问题