如何在SpringKafka异常处理程序中获取原始记录

w3nuxt5m  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(334)

我正在为SpringKafka项目实现异常处理,我有自己的deadletterpublishingrecoverer,它处理kafka侦听器中发生的异常,整个流程是完美的,即当我在逻辑中抛出任何异常时,我可以根据框架实现记录并发送到dlq主题。消费者记录出现问题,如果我更改了消费者记录值中的任何内容,同一消费者记录将发布到dlq主题中,但更改在我的senairo中实际上是错误的,我想记录原始消息。
...

@KafkaListener(topics = "${message.topic.name}",containerFactory = "kafkaListenerContainerFactory")  
public void listenGroupFoo(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException, ExecutionException {         

    System.out.println("Received Message" + consumerRecord.value());

    MyEvent consumedEvent=consumerRecord.value();
    consumedEvent.setQuantity(1000);

    throw new InvalidProductCodeException();

}

...
我发送到主题的实际消息只包含10个数量,我正在更改一些内容,如将数量更改为1000,并引发一些异常(如处理时发生的任何异常)。。。

Received MessageOrderObject CreatedDate=Mon Sep 14 19:38:15 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002

...
在我抛出错误后,我的记录将
...

Received MessageOrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002]
Error  Record  OrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=1000, customerId=0002]

这是我的dlq子类
...

@Component
 public class DeadLetterSubclass extends DeadLetterPublishingRecoverer {

@Value(value="${error.topic.name}")
static
String errorTopicName;

BusinessUtils utils=new BusinessUtils();
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> 
 MY_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition("stock-order-error-topic", cr.partition());

public DeadLetterSubclass(KafkaOperations<? extends Object, ? extends Object> template) {
    super(template,MY_DESTINATION_RESOLVER);        
    this.setHeadersFunction((consumerRecord,exception)->{
        System.out.println("Error  Record  "+consumerRecord.value());           
        return consumerRecord.headers();
    });

}

...
我想记录发生异常时要发布的原始事件对象(consumerrecord)。在我的示例中,我的实际orderquantity是10,但order是1000,这不是实际的订单。

pokxtpni

pokxtpni1#

既然你对原著有严格的引用 ConsumerRecord 如果需要在错误处理程序中保持不变,则不应对其进行变异。
框架无法预料您可能会改变侦听器中的值。“以防万一”复制一份会有太多的开销。
您不应该对原始值进行变异—而是克隆它,然后对克隆进行变异。

相关问题