因此,我的用例是在springwebflux应用程序中使用kafka的消息,同时使用projectreactor以React式风格编程,并按照从kafka接收消息的顺序对每条消息执行非阻塞操作。系统也应该能够自行恢复。
以下是要从中使用的代码段:
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
如您所见,我所做的是:从kafka获取消息,将其转换为用于新目的地的对象,然后将其发送到目的地,然后确认偏移量以将消息标记为已消费和已处理。以与从kafka使用的消息相同的顺序确认偏移量是非常重要的,这样我们就不会将偏移量移到未完全处理的消息之外(包括将一些数据发送到目的地)。所以我用的是 flatMapSequential
为了确保这一点。
为了简单起见,我们假设 transformToOutputFormat()
方法是身份转换。
public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}
这个 performAction()
方法需要在网络上执行一些操作,比如调用httprestapi。因此,适当的api返回mono,这意味着需要订阅链。另外,我需要 ReceiverRecord
通过此方法返回,以便可以在上面的flatmapsequential()运算符中确认偏移量。因为我需要订阅mono,所以我用 flatMapSequential
上面。如果不是的话,我可以用 map
相反。
public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));
在这种方法中,我有两个相互矛盾的需求:1。订阅进行http调用2的链。归还收款人记录
使用flatmap()意味着我的返回类型变为mono。在同一位置使用doonnext()将保留链中的receiverrecord,但不允许自动订阅httpclient响应。
我不能补充 .subscribe()
之后 asString()
,因为我想等到完全接收到http响应后再确认偏移量。
我不能用 .block()
因为它运行在一个平行线程上。
因此,我需要作弊并退还 record
方法范围中的对象。
另一件事是在内部重试 performAction
它切换线程。由于flatmapsequential()急切地订阅外部流量中的每个mono,这意味着虽然可以保证按顺序确认偏移量,但我们不能保证http调用 performAction
将按相同的顺序执行。
所以我有两个问题。
有可能回来吗 record
以自然的方式而不是返回方法范围对象?
有没有可能确保http调用和偏移确认的执行顺序与执行这些操作的消息的顺序相同?
1条答案
按热度按时间w8biq8rn1#
这是我想出的解决办法。
我没有使用flatmapsequential来订阅performaction mono和preserve sequence,而是延迟了kafka接收器对更多消息的请求,直到执行了操作。这就实现了我所需要的一次性处理。
因此,performaction不需要返回receiverrecord的mono。我还将其简化为: