private KafkaTemplate<String, KafkaMessage> kafkaTemplate;
Message<KafkaMessage> message = MessageBuilder
.withPayload(kafkaMessage)
.setHeader(KafkaHeaders.TOPIC, targetTopic)
.setHeader(KafkaHeaders.MESSAGE_KEY, "someStringValue" )
.setHeader("X-Custom-Header", headerCreator.generateHeader(source, type)).build();
ListenableFuture<SendResult<String, KafkaMessage>> listenableFuture = kafkaTemplate.send(message);
这是我的密码。异常发生在send方法。
例外是 java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
?
1条答案
按热度按时间ddarikpa1#
假设kafka主题需要一个avro序列化对象,您可以将插件“avro-maven-plugin”添加到项目pom中,并让maven为您生成avro类。
这个插件读取avro模式的文件,并自动(一旦项目生成)生成pojo类。如果架构包含错误或无效,则在执行任何代码之前都会收到警告。
kafkateamplate应该使用这个pojo而不是kafkamessage。
我建议阅读如何在spring引导应用程序中使用schema registry和avro,以获得一个完整的使用者和生产者示例,使用合流组件进行总体项目配置(serdes、schema registry等)。