Kafka 指定自定义ObjectMapper时忽略spring.json.type.mapping属性

j13ufse2  于 2023-01-12  发布在  Apache
关注(0)|答案(1)|浏览(99)

我在将自定义ObjectMapper注入Spring Kafka序列化器时遇到了一个问题,我已经用这个answer解决了这个问题,LocalDateTime正在用正确的模式进行序列化。

@Configuration
public class KafkaCustomizer implements DefaultKafkaProducerFactoryCustomizer {

    @Bean
    public ObjectMapper objectMapper() {
        var mapper = new ObjectMapper();
        var module = new JavaTimeModule();
        var serializer = new LocalDateTimeSerializer(
                DateTimeFormatter.ofPattern(DateConstants.DATETIME_FORMAT_PATTERN));
        module.addSerializer(LocalDateTime.class, serializer);
        mapper.registerModule(module);
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return mapper;
    }

    @Override
    public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
        producerFactory.setValueSerializer(new JsonSerializer<>(objectMapper()));
    }

}

但是现在我面临另一个问题,spring.kafka.producer.properties.spring.json.type.mapping属性被忽略了。
我的记录的__TypeId__标头是用FQCN设置的,而不是用我放在spring.json.type.mapping属性中的令牌设置的:foo > com.foo.package.Foo
当我执行debbug时,org.springframework.kafka.support.serializer.JsonSerializer类的configure方法似乎没有被调用:

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    ...
    if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet
            && this.typeMapper instanceof AbstractJavaTypeMapper) {
        ((AbstractJavaTypeMapper) this.typeMapper)
                .setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
    }
}

但当我禁用自定义时

@Override
    public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
        // producerFactory.setValueSerializer(new JsonSerializer<>(objectMapper()));
    }

然后使用正确的标记设置__TypeId__标头,但正如预期的那样,我使用自定义ObjectMapper丢失了日期格式
那么如何处理这整个情况呢?

eufgjt7s

eufgjt7s1#

如果您自己执行new JsonSerializer<>,则需要自己为它提供适当的生产者配置。如果serialized的示例不受Kafka Client控制,则不调用configure()
我想说,在您的情况下,可以这样做:

public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
    JsonSerializer<Object> jsonSerializer = new JsonSerializer<>(objectMapper());
    jsonSerializer.configure(producerFactory.getConfigurationProperties(), false);
    producerFactory.setValueSerializer(jsonSerializer);
}

文档中有一些信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#tip-json,但是我们可能需要对它进行扩展,以便进行编程配置...

相关问题