我正在使用带有kafka活页夹和avro的springcloudstreamversion2.2.0。显然,一个不正确的记录被发布到kafka主题之一,它导致所有使用者返回反序列化错误并进行某种无限重试。
Error deserializing key/value for partition realtime-object-1 at offset 908. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 13 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class au.com.brolly.avro.model.GenericTaskTriggerCapsule specified in writer's schema whilst finding reader's schema for a SpecificRecord.
从技术上讲,应该有一种方法可以指定反序列化异常的策略。我可以找到一些不同的政策,比如 logAndContinue
以及 sendToDlq
但它们是Kafka流,我没有在我的应用程序中使用。如果有人能帮我理解我在这里缺少什么,我将不胜感激。
1条答案
按热度按时间ioekq8ef1#
请参阅spring kafka文档。
使用
ErrorHandlingDeserializer2
Package avro反序列化程序。异常将被发送到错误处理程序(默认情况下,它只记录错误)。你可以使用
ListenerContainerCustomizer
添加一个SeekToCurrentErrorHandler
(可选带有DeadLetterPublishingRecoverer
)将反序列化失败的记录发送到另一个主题。编辑
下面是一个用SpringCloudStream配置它的示例。。。
像以前一样向配置中添加任何其他属性(如schema registry url)(ehd2配置它的委托反序列化器)。