json消息的kafka dlq配置

nfs0ujit  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(288)

我有一个带两个处理器的scdf流。流的目标是从源主题读取jsonb,并将其处理/解析到jdbc postgres中。大多数情况下,json足够大,可以完全解析并得到错误。

org.apache.kafka.common.errors.RecordTooLargeException: The message is 4277122 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

虽然增加max.request.size有助于解决错误,但我的主要任务是实现dlq的功能。
我尝试配置kafka dlq以获取dlq主题中的错误消息。以下是我实现的配置:

spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqName=error.dlq
spring.cloud.stream.kafka.bindings.input.consumer.auto-commit-offset=true
spring.cloud.stream.kafka.bindings.input.consumer.auto-commit-on-error=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqProducerProperties.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.input.consumer.dlqProducerProperties.configuration.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq
spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.consumer.headerMode=headers

在运行scdf流之后,dlq主题正在创建到kafka中,但是记录过大的异常消息仍然没有移动到dlq主题。
是否有任何配置丢失或输入错误?我也问过其他类似的问题,但解决不了这个问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题