我正在使用Spring。我有一个为整个项目配置的ObjectMapper,我用它来设置一个Kafka反序列化器。然后我需要一个自定义的Kafka反序列化器来在KafkaListener中使用。
我是通过自动配置来配置KafkaListener的,而不是通过@Configuration类。
@Component
@RequiredArgsConstructor
public class CustomMessageDeserializer implements Deserializer<MyMessage> {
private final ObjectMapper objectMapper;
@SneakyThrows
@Override
public MyMessage deserialize(String topic, byte[] data) {
return objectMapper.readValue(data, MyMessage.class);
}
}
如果我真的喜欢这个
@KafkaListener(
topics = {"${topics.invite-user-topic}"},
properties = {"value.deserializer=com.service.deserializer.CustomMessageDeserializer"}
)
public void receiveInviteUserMessages(MyMessage myMessage) {}
我收到了KafkaException:未能找到公共无参数构造函数
但是使用CustomMessageDeserializer类中的公共无参数构造函数,我得到了NPE,因为ObjectMapper = null。它创建并使用了一个新类,而不是一个Spring组件。
@KafkaListener支持SpEL表达式。
而且我认为这个问题可以用SpEL来解决,你知道如何用SpEL来注入spring bean CustomMessageDeserializer吗?
2条答案
按热度按时间mtb9vblg1#
用SPeL做这件事并不容易。
分析
要开始使用,请参阅
@KafkaListener#properties
的JavaDoc:value.deserializer
的值用于示例化指定的反序列化器类。让我们遵循调用链:1.如果在
@KafkaListener
注解中指定了这个值,那么可能就不会创建ConsumerFactory.class
的bean,所以Spring会自己创建这个bean类--参见KafkaAutoConfiguration#kafkaConsumerFactory
。1.接下来是使用默认传递表达式keyDeserializer/valueDeserializer =
() -> null
的构造函数将返回对象new DefaultKafkaConsumerFactory(...)
创建为ConsumerFactory<?,?>
1.此工厂用于创建Kafka使用者(入口点是构造函数
KafkaMessageListenerContainer#ListenerConsumer
,然后是KafkaMessageListenerContainer.this.consumerFactory.createConsumer...
)1.在
KafkaConsumer
构造函数中,创建valueDeserializer
对象,因为它为null(对于上面第2点的默认工厂):config.getConfiguredInstance
的实现涉及使用反射和String"com.service.deserializer.CustomMessageDeserializer"
类名通过无参数构造函数示例化反序列化器类溶液
1.要将
value.deserializer
与定制的ObjectMapper
一起使用,必须使用setValueDeserializer(...)
方法自己创建ConsumerFactory
bean。这在JSON.Mapping_Types.重要文档的第二个重要部分中也有提及1.如果你不想创建一个
ConsumerFactory
bean,并且在你的反序列化器中没有复杂的逻辑(你只有return objectMapper.readValue(data, MyMessage.class);
),那么注册DefaultKafkaConsumerFactoryCustomizer
:在这种情况下,你不需要创建你自己的
CustomMessageDeserializer
类(删除它),Spring会自动将消息解析到您的MyMessage
中。@KafkaListener
注解也不应包含属性properties = {"value.deserializer=com.my.kafka_test.component.CustomMessageDeserializer"}
。此DefaultKafkaConsumerFactoryCustomizer
bean将自动用于配置默认ConsumerFactory<?, ?>
(参见KafkaAutoConfiguration#kafkaConsumerFactory
方法的实现)yc0p9oo02#
以下是它对我的作用:
然后,我在配置中定义了2个Bean
注意,接口
ZiDeserializer<SolrInputDocument> deserializer
是我的接口,ZiDeserializerFactory.getInstance(validatedKeyDeserializerName);
返回我的ZiDeserializer
的自定义实现,ZiDeserializer
扩展org.apache.kafka.common.serialization.Deserializer
。