使用onerrorresume处理使用reactor kafka发布到kafka的有问题的有效负载

xbp102n0  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(655)

我使用React堆Kafka发送Kafka消息,并接收和处理它们。在接收kakfa负载时,我会进行一些反序列化,如果出现异常,我只想记录该负载(通过保存到mongo),然后继续接收其他负载。
为此,我使用以下方法-

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}

private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}

在这里,我希望每当我在接收有效负载时遇到异常时,onerrorresume应该捕获它并登录到mongo,然后当我发送到kafka队列时,我应该能够继续接收更多消息。但是,我看到在异常之后,即使调用了onerrorresume方法,但是我无法再处理发送到kakfa主题的消息。我有什么遗漏吗?

cwtwac6a

cwtwac6a1#

正如@bsideup所提到的,我最终没有从反序列化程序抛出异常,因为kafka无法提交该记录的偏移量,并且没有干净的方法可以忽略该记录并继续使用记录,因为我们没有记录的偏移量信息(因为它的格式不正确)。因此,即使我尝试使用React性错误操作符忽略记录,轮询也会获取相同的记录,然后消费者就会陷入困境

pxq42qpu

pxq42qpu2#

如果需要优雅地处理错误,可以添加 onErrorResume 内部 delayUntil :

flux
    .delayUntil(r -> {
        return process(r)
            .onErrorReturn(e -> saveToMongo(r));
    });
    .doOnNext(r -> r.receiverOffset().acknowledge())
    .subscribe();

React运算符将错误视为终端信号,如果您的内部逻辑(内部 delayUntil )抛出错误, delayUntil 将终止序列,并且 onErrorReturn 之后 delayUntil 不会让它继续处理Kafka的事件。

相关问题