spring 如何通过属性注入自定义反序列化程序?

eqoofvh9  于 2022-12-17  发布在  Spring
关注(0)|答案(2)|浏览(132)

我正在使用Spring。我有一个为整个项目配置的ObjectMapper,我用它来设置一个Kafka反序列化器。然后我需要一个自定义的Kafka反序列化器来在KafkaListener中使用。
我是通过自动配置来配置KafkaListener的,而不是通过@Configuration类。

@Component
@RequiredArgsConstructor
public class CustomMessageDeserializer implements Deserializer<MyMessage> {
    private final ObjectMapper objectMapper;

    @SneakyThrows
    @Override
    public MyMessage deserialize(String topic, byte[] data) {
        return objectMapper.readValue(data, MyMessage.class);
    }
}

如果我真的喜欢这个

@KafkaListener(
    topics = {"${topics.invite-user-topic}"},
    properties = {"value.deserializer=com.service.deserializer.CustomMessageDeserializer"}
)
public void receiveInviteUserMessages(MyMessage myMessage) {}

我收到了KafkaException:未能找到公共无参数构造函数
但是使用CustomMessageDeserializer类中的公共无参数构造函数,我得到了NPE,因为ObjectMapper = null。它创建并使用了一个新类,而不是一个Spring组件。
@KafkaListener支持SpEL表达式。
而且我认为这个问题可以用SpEL来解决,你知道如何用SpEL来注入spring bean CustomMessageDeserializer吗?

mtb9vblg

mtb9vblg1#

用SPeL做这件事并不容易。

分析

要开始使用,请参阅@KafkaListener#properties的JavaDoc:

/**
* 
* SpEL expressions must resolve to a String ...
*/

value.deserializer的值用于示例化指定的反序列化器类。让我们遵循调用链:
1.如果在@KafkaListener注解中指定了这个值,那么可能就不会创建ConsumerFactory.class的bean,所以Spring会自己创建这个bean类--参见KafkaAutoConfiguration#kafkaConsumerFactory
1.接下来是使用默认传递表达式keyDeserializer/valueDeserializer = () -> null的构造函数将返回对象new DefaultKafkaConsumerFactory(...)创建为ConsumerFactory<?,?>
1.此工厂用于创建Kafka使用者(入口点是构造函数KafkaMessageListenerContainer#ListenerConsumer,然后是KafkaMessageListenerContainer.this.consumerFactory.createConsumer...
1.在KafkaConsumer构造函数中,创建valueDeserializer对象,因为它为null(对于上面第2点的默认工厂):

if (valueDeserializer == null) {
     this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
  1. config.getConfiguredInstance的实现涉及使用反射和String"com.service.deserializer.CustomMessageDeserializer"类名通过无参数构造函数示例化反序列化器类
溶液

1.要将value.deserializer与定制的ObjectMapper一起使用,必须使用setValueDeserializer(...)方法自己创建ConsumerFactory bean。这在JSON.Mapping_Types.重要文档的第二个重要部分中也有提及
1.如果你不想创建一个ConsumerFactory bean,并且在你的反序列化器中没有复杂的逻辑(你只有return objectMapper.readValue(data, MyMessage.class);),那么注册DefaultKafkaConsumerFactoryCustomizer

@Bean
// inject your custom objectMapper
public DefaultKafkaConsumerFactoryCustomizer customizeJsonDeserializer(ObjectMapper objectMapper) {  
    return consumerFactory ->
            consumerFactory.setValueDeserializerSupplier(() ->
                    new org.springframework.kafka.support.serializer.JsonDeserializer<>(objectMapper));
}

在这种情况下,你不需要创建你自己的CustomMessageDeserializer类(删除它),Spring会自动将消息解析到您的MyMessage中。@KafkaListener注解也不应包含属性properties = {"value.deserializer=com.my.kafka_test.component.CustomMessageDeserializer"}。此DefaultKafkaConsumerFactoryCustomizer bean将自动用于配置默认ConsumerFactory<?, ?>(参见KafkaAutoConfiguration#kafkaConsumerFactory方法的实现)

yc0p9oo0

yc0p9oo02#

以下是它对我的作用:

@KafkaListener(topics = "${solr.kafka.topic}", containerFactory = "batchFactory")
public void listen(List<SolrInputDocument> docs, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers, Acknowledgment ack) throws IOException {...}

然后,我在配置中定义了2个Bean

@Profile("!test")
@Bean
@Autowired
public ConsumerFactory<String, SolrInputDocument> consumerFactory(KafkaProperties properties) {
    Map<String, Object> props = properties.buildConsumerProperties();
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    DefaultKafkaConsumerFactory<String, SolrInputDocument> result = new DefaultKafkaConsumerFactory<>(props);
    String validatedKeyDeserializerName = KafkaMessageType.valueOf(keyDeserializerName).toString();
    ZiDeserializer<SolrInputDocument> deserializer = ZiDeserializerFactory.getInstance(validatedKeyDeserializerName);
    result.setValueDeserializer(deserializer);

    return result;
}

@Profile("!test")
@Bean
@Autowired
public ConcurrentKafkaListenerContainerFactory<String, SolrInputDocument> batchFactory(ConsumerFactory<String, SolrInputDocument> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, SolrInputDocument> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    factory.setConcurrency(2);

    ExponentialBackOffWithMaxRetries backoff = new ExponentialBackOffWithMaxRetries(10);
    backoff.setMultiplier(3); // Default is 1.5 but this seems more reasonable
    factory.setCommonErrorHandler(new DefaultErrorHandler(null, backoff));

    // Needed for manual commits
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

注意,接口ZiDeserializer<SolrInputDocument> deserializer是我的接口,ZiDeserializerFactory.getInstance(validatedKeyDeserializerName);返回我的ZiDeserializer的自定义实现,ZiDeserializer扩展org.apache.kafka.common.serialization.Deserializer

相关问题