我正在使用spring-kafka 3.0.4和AWS基础设施。我想正确配置重试和DLT,但在处理重试时遇到了一些问题。我的一些Kafka配置包括:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
kafkaConsumerProps,
StringDeserializer::new,
() -> new ErrorHandlingDeserializer<>(kafkaAwsGlueValueDeserializer()),
false
);
}
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, Object> kafkaTemplate) {
DefaultKafkaProducerFactory<String, byte[]> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(),
new ByteArraySerializer());
KafkaTemplate<String, byte[]> bytesKafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(Object.class, kafkaTemplate);
templates.put(byte[].class, bytesKafkaTemplate);
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(templates);
return deadLetterPublishingRecoverer;
}
我还使用一些自定义属性配置了RetryTopicConfiguration bean,并为其指定了句柄方法,并提供了使用GiftSchemaRegistryKafkaSerializer的Kafka模板。
如果在特定消费者中处理消息时抛出异常,则一切都很好。正如我所理解的,当抛出null值时,会将null值传递给DeadLetterPublishingSerializer,并且应该使用ByteArraySerializer为我的DLT主题生成记录。然而,我可以看到,由于某种原因,GSTMI SchemaRegistryKafkaSerializer被用来为我的DLT生成消息,因为错误的消息,它失败并阻止了我的消费者:
2023-08-31T09:06:47.952Z ERROR 1 --- [ntainer#6-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = test.TestData, partition = 1, offset = 0, main topic = test.TestData threw an error at topic test.TestData and won't be retried. Sending to DLT with name test.TestData-test-dlt.
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
...
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize
...
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
...
at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer.deserialize(GlueSchemaRegistryKafkaDeserializer.java:116) ~[schema-registry-serde-1.1.15.jar!/:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-3.3.2.jar!/:na]
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:179) ~[spring-kafka-3.0.4.jar!/:3.0.4]
... 15 common frames omitted
Caused by: org.apache.avro.AvroTypeException: Found test.avro.TestData, expecting test.avro.TestData, missing required field testId
...
2023-08-31T09:06:47.955Z WARN 1 --- [ntainer#6-0-C-1] r.DeadLetterPublishingRecovererFactory$1 : Destination resolver returned non-existent partition test.TestData-test-dlt-1, KafkaProducer will determine partition to use for this topic
2023-08-31T09:06:47.955Z ERROR 1 --- [ntainer#6-0-C-1] c.a.s.schemaregistry.utils.AVROUtils : Unsupported Avro Data Formats
2023-08-31T09:06:47.956Z ERROR 1 --- [ntainer#6-0-C-1] c.a.s.schemaregistry.utils.AVROUtils : Unsupported Type of Record received
2023-08-31T09:06:47.956Z ERROR 1 --- [ntainer#6-0-C-1] r.DeadLetterPublishingRecovererFactory$1 : Dead-letter publication to test.TestData-test-dlt failed for: test.TestData-1@0
com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Unsupported Type of Record received
...
at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer.prepareInput(GlueSchemaRegistryKafkaSerializer.java:157)
2023-08-31T09:06:47.957Z ERROR 1 --- [ntainer#6-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Failed to determine if this record (test.TestData-1@0) should be recovererd, including in seeks
org.springframework.kafka.KafkaException: Dead-letter publication to test.TestData-test-dlt failed for:test.TestData-1@0
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:669) ~[spring-kafka-3.0.4.jar!/:3.0.4]
...
我的配置有什么问题?
我试过使用不同的配置,但没有成功
1条答案
按热度按时间pw9qyyiw1#
使用
DelegatingByTypeSerializer
https://docs.spring.io/spring-kafka/docs/current/reference/html/#by-type
2.8版引入了
DelegatingByTypeSerializer
。从版本2.8.3开始,可以配置序列化程序来检查Map键是否可从目标对象分配,这在委托序列化程序可以序列化子类时很有用。在这种情况下,如果存在不明确的匹配,则应该提供有序的
Map
,例如LinkedHashMap
。或者使用
DelegatingByTopicSerializer
。https://docs.spring.io/spring-kafka/docs/current/reference/html/#by-topic
后者可以配置属性;
DelegatingByTypeSerializer
必须通过构造函数或setter注入生产者工厂。