json Apache Kafka使用者无法反序列化java POJO

4c8rllxm  于 2022-11-19  发布在  Apache
关注(0)|答案(1)|浏览(121)

我在配置Kafka消费者以反序列化由生产者作为有效负载发送的Java对象时遇到问题。我将收到以下错误消息:
无法将GenericMessage [有效负载={“类型”:“doc”,“内容”:“A”},(..)]从[java.lang.String]转换为[foo.bar.MyObject]
我的对象只包含两个字段:

@Data
public class MyObject {
    String type;
    String content;
}

我已经为我的Kafka配置中的对象添加了一个JSON序列化器:

@Configuration
public class KafkaConf {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        JsonDeserializer myJsonDeserializer = new JsonDeserializer<>(MyObject.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, myJsonDeserializer);
        return props;
    }

    @Bean
    public ConsumerFactory<String, MyObject> myObjectConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

}

我还在Kafka侦听器中明确指定了我的消费者工厂,如下所示:

@KafkaListener(topics = "doc", groupId = "repliesGroup", containerFactory = "myObjectConsumerFactory")
    @SendTo
    public List<String> listen(MyObject foo) {
        return myService.doStuff(foo);
    }

我还遗漏了什么?

50few1ms

50few1ms1#

我运行了同样的问题,在设置containerFactory(您已经有了这个)并按类名添加反序列化器之后,它被修复了:

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());

还添加了使用者的默认类型:

properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE,"foo.bar.MyObject");

另外,我的@KafkaLister方法还注解了@Payload参数。

相关问题