java 尝试使用生产者自定义程序序列化时不兼容的类型

ukqbszuj  于 2023-04-19  发布在  Java
关注(0)|答案(1)|浏览(118)

我正在为Kafka使用spring,我想使用DefaultKafkaProducerFactoryCustomizer来定制我的生产者工厂,因为我需要多值序列化器,所以我从

@Bean
  public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
    return (producerFactory) -> {
      producerFactory.setKeySerializer(new StringSerializer());
    };
  }

但是编译器不高兴

incompatible types: org.apache.kafka.common.serialization.StringSerializer cannot be 
converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>

与相同的问题(我使用ByteArraySerializer发送无法反序列化到dlq的消息)

producerFactory.setValueSerializer(
      new DelegatingByTypeSerializer(
          Map.of(
              byte[].class,
              new ByteArraySerializer(),
              Foo.class,
              new JsonSerializer<>())));

我得到了:

incompatible types: org.springframework.kafka.support.serializer.DelegatingByTypeSerializer 
cannot be converted to org.apache.kafka.common.serialization.Serializer<capture#1 of ?>

但只要有

producerFactory.setValueSerializer(new JsonSerializer<>());

DefaultKafkaProducerFactoryCustomizer类的定义如下

@FunctionalInterface
public interface DefaultKafkaProducerFactoryCustomizer {

    /**
     * Customize the {@link DefaultKafkaProducerFactory}.
     * @param producerFactory the producer factory to customize
     */
    void customize(DefaultKafkaProducerFactory<?, ?> producerFactory);

}

如何解决这个问题?
我知道配置ValueSerializeKeySerializer的其他替代方案,但我想首先知道为什么我的代码不能正常工作

11dmarpk

11dmarpk1#

导致此问题的原因是DefaultKafkaProducerFactory使用通配符类型为其键和值序列化器定义。发送到setKeySerializersetValueSerializer方法的参数与特定的序列化器类型不兼容。它需要Serializer<?>,但您提供的是StringSerializerDelegatingByTypeSerializer
使用类型转换(代码如下)应该可以解决这个问题;

@Bean
public DefaultKafkaProducerFactoryCustomizer defaultKafkaProducerFactoryCustomizer() {
  return (producerFactory) -> {
    producerFactory.setKeySerializer((Serializer<String>) new StringSerializer());
  };
}


producerFactory.setValueSerializer((Serializer<Object>) new DelegatingByTypeSerializer(
      Map.of(
          byte[].class,
          new ByteArraySerializer(),
          Foo.class,
          new JsonSerializer<>())
));

相关问题