用属性文件处理SpringKafka中的java错误?

2o7dmzc5  于 2021-06-26  发布在  Java
关注(0)|答案(1)|浏览(422)

我收到了一堆反序列化失败之前,我的 Kafka Listener 被击中了。我在调查加里·拉塞尔建造的东西,但在使用上有问题。我所有的东西都是通过属性文件配置的。

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

所以如果我添加这些,我的理解是它在消费者记录的头中 Package 了一个错误?我的最终目标是让任何反序列化异常命中我拥有的某个自定义类,这样我就可以处理我想用它做的事情。即,转发给我的死信处理程序上传失败的数据到s3。
我试着在kafkalistener中添加errorhandler标志,但也没用。

更新的属性配置

我已经更新了我的配置,我仍然不清楚这是否正确。它不起作用,所以我想不会。
没有调用任何自定义代码

spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider

spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate

巴德福

public class BadFoo {

    private final FailedDeserializationInfo failedDeserializationInfo;

    public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
        this.failedDeserializationInfo = failedDeserializationInfo;
    }

    public FailedDeserializationInfo getFailedDeserializationInfo() {
        return this.failedDeserializationInfo;
    }
}

失败的FooProvider

public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
    @Override
    public String apply(FailedDeserializationInfo info) {
        System.out.println("");
        return "";
    }
}
ztmd8pv5

ztmd8pv51#

请参阅此处和此处的文档。
还可以看看 DeadLetterPublishingRecoverer 代码,可用于将失败的记录发布到其他主题。您可以在此之后对代码进行建模,以获取包含失败代码的头 byte[] .
https://github.com/spring-projects/spring-kafka/blob/fa5c35e9b15c4cecfc6ea2bbbf9e7745bc5d9f75/spring-kafka/src/main/java/org/springframework/kafka/listener/deadletterpublishingrecoverer.java#l169-l178型
回收器与 SeekToCurrentErrorHandler .
将错误处理程序配置为 @Bean Spring Boot会自动将它连接到容器中。

相关问题