我正在为Kafka使用spring,我想使用DefaultKafkaProducerFactoryCustomizer
来定制我的生产者工厂,因为我需要多值序列化器,所以我从
@Bean
public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
return (producerFactory) -> {
producerFactory.setKeySerializer(new StringSerializer());
};
}
但是编译器不高兴
incompatible types: org.apache.kafka.common.serialization.StringSerializer cannot be
converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>
与相同的问题(我使用ByteArraySerializer
发送无法反序列化到dlq的消息)
producerFactory.setValueSerializer(
new DelegatingByTypeSerializer(
Map.of(
byte[].class,
new ByteArraySerializer(),
Foo.class,
new JsonSerializer<>())));
我得到了:
incompatible types: org.springframework.kafka.support.serializer.DelegatingByTypeSerializer
cannot be converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>
但只要有
producerFactory.setValueSerializer(new JsonSerializer<>());
DefaultKafkaProducerFactoryCustomizer
类的定义如下
@FunctionalInterface
public interface DefaultKafkaProducerFactoryCustomizer {
/**
* Customize the {@link DefaultKafkaProducerFactory}.
* @param producerFactory the producer factory to customize
*/
void customize(DefaultKafkaProducerFactory<?, ?> producerFactory);
}
如何解决这个问题?
我知道配置ValueSerialize
和KeySerializer
的其他替代方案,但我想首先知道为什么我的代码不能正常工作
1条答案
按热度按时间11dmarpk1#
导致此问题的原因是
DefaultKafkaProducerFactory
使用通配符类型为其键和值序列化器定义。发送到setKeySerializer
或setValueSerializer
方法的参数与特定的序列化器类型不兼容。它需要Serializer<?>
,但您提供的是StringSerializer
或DelegatingByTypeSerializer
。使用类型转换(代码如下)应该可以解决这个问题;