java—在使用流量(包括重试)时按顺序调用非阻塞操作

lyr7nygr  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(404)

因此,我的用例是在springwebflux应用程序中使用kafka的消息,同时使用projectreactor以React式风格编程,并按照从kafka接收消息的顺序对每条消息执行非阻塞操作。系统也应该能够自行恢复。
以下是要从中使用的代码段:

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
        KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
        return receiver.receive();
    });

    messages.map(this::transformToOutputFormat)
            .map(this::performAction)
            .flatMapSequential(receiverRecordMono -> receiverRecordMono)
            .doOnNext(record -> record.receiverOffset().acknowledge())
            .doOnError(error -> logger.error("Error receiving record", error))
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .subscribe();

如您所见,我所做的是:从kafka获取消息,将其转换为用于新目的地的对象,然后将其发送到目的地,然后确认偏移量以将消息标记为已消费和已处理。以与从kafka使用的消息相同的顺序确认偏移量是非常重要的,这样我们就不会将偏移量移到未完全处理的消息之外(包括将一些数据发送到目的地)。所以我用的是 flatMapSequential 为了确保这一点。
为了简单起见,我们假设 transformToOutputFormat() 方法是身份转换。

public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
    return record;
}

这个 performAction() 方法需要在网络上执行一些操作,比如调用httprestapi。因此,适当的api返回mono,这意味着需要订阅链。另外,我需要 ReceiverRecord 通过此方法返回,以便可以在上面的flatmapsequential()运算符中确认偏移量。因为我需要订阅mono,所以我用 flatMapSequential 上面。如果不是的话,我可以用 map 相反。

public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
    return Mono.just(record)
            .flatMap(receiverRecord ->
                    HttpClient.create()
                            .port(3000)
                            .get()
                            .uri("/makeCall?data=" + receiverRecord.value().getData())
                            .responseContent()
                            .aggregate()
                            .asString()
            )
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .then(Mono.just(record));

在这种方法中,我有两个相互矛盾的需求:1。订阅进行http调用2的链。归还收款人记录
使用flatmap()意味着我的返回类型变为mono。在同一位置使用doonnext()将保留链中的receiverrecord,但不允许自动订阅httpclient响应。
我不能补充 .subscribe() 之后 asString() ,因为我想等到完全接收到http响应后再确认偏移量。
我不能用 .block() 因为它运行在一个平行线程上。
因此,我需要作弊并退还 record 方法范围中的对象。
另一件事是在内部重试 performAction 它切换线程。由于flatmapsequential()急切地订阅外部流量中的每个mono,这意味着虽然可以保证按顺序确认偏移量,但我们不能保证http调用 performAction 将按相同的顺序执行。
所以我有两个问题。
有可能回来吗 record 以自然的方式而不是返回方法范围对象?
有没有可能确保http调用和偏移确认的执行顺序与执行这些操作的消息的顺序相同?

w8biq8rn

w8biq8rn1#

这是我想出的解决办法。

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
    KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
    return receiver.receive();
});

messages.map(this::transformToOutputFormat)
        .delayUntil(this::performAction)
        .doOnNext(record -> record.receiverOffset().acknowledge())
        .doOnError(error -> logger.error("Error receiving record", error))
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
        .subscribe();

我没有使用flatmapsequential来订阅performaction mono和preserve sequence,而是延迟了kafka接收器对更多消息的请求,直到执行了操作。这就实现了我所需要的一次性处理。
因此,performaction不需要返回receiverrecord的mono。我还将其简化为:

public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
    HttpClient.create()
        .port(3000)
        .get()
        .uri("/makeCall?data=" + receiverRecord.value().getData())
        .responseContent()
        .aggregate()
        .asString()
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}

相关问题