我收到了一堆反序列化失败之前,我的 Kafka Listener
被击中了。我在调查加里·拉塞尔建造的东西,但在使用上有问题。我所有的东西都是通过属性文件配置的。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
所以如果我添加这些,我的理解是它在消费者记录的头中 Package 了一个错误?我的最终目标是让任何反序列化异常命中我拥有的某个自定义类,这样我就可以处理我想用它做的事情。即,转发给我的死信处理程序上传失败的数据到s3。
我试着在kafkalistener中添加errorhandler标志,但也没用。
更新的属性配置
我已经更新了我的配置,我仍然不清楚这是否正确。它不起作用,所以我想不会。
没有调用任何自定义代码
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate
巴德福
public class BadFoo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
失败的FooProvider
public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
@Override
public String apply(FailedDeserializationInfo info) {
System.out.println("");
return "";
}
}
1条答案
按热度按时间ztmd8pv51#
请参阅此处和此处的文档。
还可以看看
DeadLetterPublishingRecoverer
代码,可用于将失败的记录发布到其他主题。您可以在此之后对代码进行建模,以获取包含失败代码的头byte[]
.https://github.com/spring-projects/spring-kafka/blob/fa5c35e9b15c4cecfc6ea2bbbf9e7745bc5d9f75/spring-kafka/src/main/java/org/springframework/kafka/listener/deadletterpublishingrecoverer.java#l169-l178型
回收器与
SeekToCurrentErrorHandler
.将错误处理程序配置为
@Bean
Spring Boot会自动将它连接到容器中。