我需要将自定义值序列化程序设置到spring的kafkatemplate中。值序列化程序如下所示: JsonSerializer<JourneyMailExchange> serializer = new JsonSerializer<>(customObjectMapper);
(例如,为了 PropertyNamingStrategy.SNAKE_CASE
)
使用kafkaproducer很容易做到:
new KafkaProducer<>(properties, Serdes.String().serializer(), valueSerializer);
但我还没有找到这样做的可能性 KafkaTemplate
. 我只看到 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
的属性 ProducerFactory
,但这不是我想要的(不可能提供具体的示例)。
我们通过以下方式创建kafkatemplate:
@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(producerConfigs(kafkaProperties.getDefaultSettings()));
}
private static Map<String, Object> producerConfigs(Map<String, String> defaultSettings) {
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setDefaultTopic("topicName");
return kafkaTemplate;
}
还发现了以下方法:
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter(customObjectMapper));
但只有当我们使用spring时,它才会应用转换 Message
如果调用带有键和值的send方法,则忽略提供的转换。
SpringKafka版本:2.1.0.0
1条答案
按热度按时间hujrc8aj1#
最后我发现,通过配置多个
DefaultKafkaProducerFactory
通过向构造函数提供值序列化程序:特定值序列化程序的最终版本如下所示(已配置的
ProducerFactory
与所需的不同值序列化程序数相同):