我正在使用Kafka来使用消息。使用消息时,可能会收到导致DeserializationException的不同消息。我希望跳过导致DeserializationException的记录,并处理不会导致任何问题的记录。
所有Kafka相关属性都通过如下属性进行配置:
kafka:
producer:
bootstrap-servers:
- PRODUCER_BROKERS
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
consumer:
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
bootstrap-servers:
- CONSUMER_BROKERS
properties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
当我在谷歌上搜索时,我得到了一些从How to catch deserialization error in Kafka-Spring?实现ErrorHandler
的解决方案,但由于我使用的是属性,我不确定如何将其绑定到ConcurrentKafkaListenerContainerFactory
。什么是更好的方法?
1条答案
按热度按时间f45qwnt81#
您的配置看起来正确。
默认错误处理程序(
DefaultErrorHandler
)将放弃(记录)具有失败的反序列化错误的记录。