带SpringKafka的Kafka死信队列(dlq)

jei2mxaa  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(622)

在spring boot 2.0应用程序中,使用spring kafka 2.1.x实现死信队列(dlq)概念的最佳方法是什么?使用spring kafka 2.1.x将某个bean的@kafkalistener方法处理失败的所有消息发送到某个预定义的kafka dlq主题,并且不丢失单个消息?
所以Kafka的记录是:
已成功处理,
未能处理并发送到dlq主题,
处理失败,未发送到dlq主题(由于意外问题),因此将再次被侦听器使用。
我尝试用errorhandler的自定义实现创建侦听器容器,使用kafkatemplate将记录发送到dlq主题失败。使用禁用的自动提交和记录确认模式。

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}

这个实现似乎不能保证第3项。如果将在dlqerrorhandler中引发异常,则侦听器不会再次使用记录。
使用事务侦听器容器会有帮助吗?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

使用springkafka实现dlq概念有什么方便的方法吗?
更新日期:28/03/2018
感谢garyrussell的回答,我通过实现dlqerrorhandler实现了所需的行为,如下所示

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}

如果consumer poll返回3条记录(1、2、3),而第二条记录无法处理,则使用这种方法:
1将被处理
2将无法处理并发送到dlq
3由于consumer seek to record.offset()+1,它将被传递给侦听器
如果发送到dlq失败,消费者将查找record.offset(),记录将重新传递到侦听器(发送到dlq可能会失效)。

3wabscal

3wabscal1#

看到了吗 SeekToCurrentErrorHandler .
当发生异常时,它将查找使用者,以便在下一次轮询时重新传递所有未处理的记录。
如果dlq写入失败,您可以使用相同的技术(例如子类)写入dlq并查找当前偏移量(和其他未处理的偏移量),如果dlq写入成功,则只查找剩余的记录。

相关问题