Kafka 当前DLT配置未正确处理DeserializationException

mrwjdhj3  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(116)

我正在使用spring-kafka 3.0.4和AWS基础设施。我想正确配置重试和DLT,但在处理重试时遇到了一些问题。我的一些Kafka配置包括:

@Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                kafkaConsumerProps,
                StringDeserializer::new,
                () -> new ErrorHandlingDeserializer<>(kafkaAwsGlueValueDeserializer()),
                false
        );
    }

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, Object> kafkaTemplate) {
        DefaultKafkaProducerFactory<String, byte[]> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(),
                        new ByteArraySerializer());
        KafkaTemplate<String, byte[]> bytesKafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);

        Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
        templates.put(Object.class, kafkaTemplate);
        templates.put(byte[].class, bytesKafkaTemplate);
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(templates);
        return deadLetterPublishingRecoverer;
    }

我还使用一些自定义属性配置了RetryTopicConfiguration bean,并为其指定了句柄方法,并提供了使用GiftSchemaRegistryKafkaSerializer的Kafka模板。
如果在特定消费者中处理消息时抛出异常,则一切都很好。正如我所理解的,当抛出null值时,会将null值传递给DeadLetterPublishingSerializer,并且应该使用ByteArraySerializer为我的DLT主题生成记录。然而,我可以看到,由于某种原因,GSTMI SchemaRegistryKafkaSerializer被用来为我的DLT生成消息,因为错误的消息,它失败并阻止了我的消费者:

2023-08-31T09:06:47.952Z ERROR 1 --- [ntainer#6-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = test.TestData, partition = 1, offset = 0, main topic = test.TestData threw an error at topic test.TestData and won't be retried. Sending to DLT with name test.TestData-test-dlt.
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
...
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize
...
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
    ...
    at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer.deserialize(GlueSchemaRegistryKafkaDeserializer.java:116) ~[schema-registry-serde-1.1.15.jar!/:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-3.3.2.jar!/:na]
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:179) ~[spring-kafka-3.0.4.jar!/:3.0.4]
    ... 15 common frames omitted
Caused by: org.apache.avro.AvroTypeException: Found test.avro.TestData, expecting test.avro.TestData, missing required field testId
    ...
2023-08-31T09:06:47.955Z  WARN 1 --- [ntainer#6-0-C-1] r.DeadLetterPublishingRecovererFactory$1 : Destination resolver returned non-existent partition test.TestData-test-dlt-1, KafkaProducer will determine partition to use for this topic
2023-08-31T09:06:47.955Z ERROR 1 --- [ntainer#6-0-C-1] c.a.s.schemaregistry.utils.AVROUtils     : Unsupported Avro Data Formats
2023-08-31T09:06:47.956Z ERROR 1 --- [ntainer#6-0-C-1] c.a.s.schemaregistry.utils.AVROUtils     : Unsupported Type of Record received
2023-08-31T09:06:47.956Z ERROR 1 --- [ntainer#6-0-C-1] r.DeadLetterPublishingRecovererFactory$1 : Dead-letter publication to test.TestData-test-dlt failed for: test.TestData-1@0
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Unsupported Type of Record received
    ...
    at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer.prepareInput(GlueSchemaRegistryKafkaSerializer.java:157) 
2023-08-31T09:06:47.957Z ERROR 1 --- [ntainer#6-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Failed to determine if this record (test.TestData-1@0) should be recovererd, including in seeks
org.springframework.kafka.KafkaException: Dead-letter publication to test.TestData-test-dlt failed for:test.TestData-1@0
    at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:669) ~[spring-kafka-3.0.4.jar!/:3.0.4]
    ...

我的配置有什么问题?
我试过使用不同的配置,但没有成功

pw9qyyiw

pw9qyyiw1#

使用DelegatingByTypeSerializer
https://docs.spring.io/spring-kafka/docs/current/reference/html/#by-type
2.8版引入了DelegatingByTypeSerializer

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本2.8.3开始,可以配置序列化程序来检查Map键是否可从目标对象分配,这在委托序列化程序可以序列化子类时很有用。在这种情况下,如果存在不明确的匹配,则应该提供有序的Map,例如LinkedHashMap
或者使用DelegatingByTopicSerializer
https://docs.spring.io/spring-kafka/docs/current/reference/html/#by-topic
后者可以配置属性; DelegatingByTypeSerializer必须通过构造函数或setter注入生产者工厂。

相关问题