我在配置Kafka消费者以反序列化由生产者作为有效负载发送的Java对象时遇到问题。我将收到以下错误消息:
无法将GenericMessage [有效负载={“类型”:“doc”,“内容”:“A”},(..)]从[java.lang.String]转换为[foo.bar.MyObject]
我的对象只包含两个字段:
@Data
public class MyObject {
String type;
String content;
}
我已经为我的Kafka配置中的对象添加了一个JSON序列化器:
@Configuration
public class KafkaConf {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JsonDeserializer myJsonDeserializer = new JsonDeserializer<>(MyObject.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, myJsonDeserializer);
return props;
}
@Bean
public ConsumerFactory<String, MyObject> myObjectConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
}
我还在Kafka侦听器中明确指定了我的消费者工厂,如下所示:
@KafkaListener(topics = "doc", groupId = "repliesGroup", containerFactory = "myObjectConsumerFactory")
@SendTo
public List<String> listen(MyObject foo) {
return myService.doStuff(foo);
}
我还遗漏了什么?
1条答案
按热度按时间50few1ms1#
我运行了同样的问题,在设置containerFactory(您已经有了这个)并按类名添加反序列化器之后,它被修复了:
还添加了使用者的默认类型:
另外,我的@KafkaLister方法还注解了@Payload参数。