无法为spring kafkatemplate指定valueserializer示例

k4ymrczo  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(365)

我需要将自定义值序列化程序设置到spring的kafkatemplate中。值序列化程序如下所示: JsonSerializer<JourneyMailExchange> serializer = new JsonSerializer<>(customObjectMapper); (例如,为了 PropertyNamingStrategy.SNAKE_CASE )
使用kafkaproducer很容易做到:

new KafkaProducer<>(properties, Serdes.String().serializer(), valueSerializer);

但我还没有找到这样做的可能性 KafkaTemplate . 我只看到 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); 的属性 ProducerFactory ,但这不是我想要的(不可能提供具体的示例)。
我们通过以下方式创建kafkatemplate:

@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaProducerFactory<>(producerConfigs(kafkaProperties.getDefaultSettings()));
}

private static Map<String, Object> producerConfigs(Map<String, String> defaultSettings) {
    Map<String, Object> props = new HashMap<>(defaultSettings);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
    kafkaTemplate.setDefaultTopic("topicName");
    return kafkaTemplate;
}

还发现了以下方法:

kafkaTemplate.setMessageConverter(new StringJsonMessageConverter(customObjectMapper));

但只有当我们使用spring时,它才会应用转换 Message 如果调用带有键和值的send方法,则忽略提供的转换。
SpringKafka版本:2.1.0.0

hujrc8aj

hujrc8aj1#

最后我发现,通过配置多个 DefaultKafkaProducerFactory 通过向构造函数提供值序列化程序:

public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
        Serializer<V> valueSerializer)

特定值序列化程序的最终版本如下所示(已配置的 ProducerFactory 与所需的不同值序列化程序数相同):

@Bean
public ProducerFactory<String, Object> producerFactoryWithSnakeCaseValueSerializer(KafkaProperties kafkaProperties) {
    Map<String, Object> props = producerConfigsWithSnakeCaseValueSerializer(kafkaProperties.getDefaultSettings());
    JsonSerializer<Object> valueSerializer = new JsonSerializer<>(createObjectMapper(SNAKE_CASE));
    valueSerializer.setAddTypeInfo(false);
    return new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
}

private static Map<String, Object> producerConfigsWithSnakeCaseValueSerializer(Map<String, String> defaultSettings) {
    Map<String, Object> props = new HashMap<>(defaultSettings);
    ...
    return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerConfigsWithSnakeCaseValueSerializer) {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerConfigsWithSnakeCaseValueSerializer);
    kafkaTemplate.setDefaultTopic("topicName");
    return kafkaTemplate;
}

相关问题