我们有一个HTTP端点,它接受一个命令并通过Kafka将其发送到另一个地方,然后我们必须等待通过kafka的响应,然后将ResponseEntity返回给客户端。
public Mono<ResponseEntity<byte[]>> post(@RequestBody Mono<byte[]> b) {
return b.flatMap(
identifyType -> service.requestProcess(identifyType).flatMap(this::toResponse));
}
在服务中:
public Mono<String> requestProcess(byte[] b) {
String r = authenticationService.waitForAuthentication(parseBytes(b));
if (StringUtils.isEmpty(r)) {
throw new CustomException("Failed to authenticate");
}
return Mono.just(r);
}
在身份验证服务中
public String waitForAuthentication(String data) {
CountDownLatch countDownLatch = new CountDownLatch(1);
Listener listener = new Listener(countDownLatch);
//another setup
subscribe("topic_abc", String.class, listener);
sendAuthenticationRequestToKafka(data);
try {
boolean b = countDownLatch.await(2, TimeUnit.SECONDS);
if(b) return listerner.getData();
else throw new CustomExcetion();
catch(Exception e) {
throw new CustomExcetion();
}
}
听者
@RequiredArgsConstructor
public class Listener implements Consumer<String> {
@Getter private final CountDownLatch countDownLatch;
@Getter private String data;
@Override
public void accept(String data) {
this.data = data;
// count to 0
countDownLatch.countDown();
}
}
我读了文档,所有的文档都说我们不应该在webflux应用程序上休眠/阻塞/等待,因为它会阻塞当前线程,线程无法处理另一个HTTP请求。
但在我们的情况下,等待是必须的。我们不能改变上面的流程,因为客户端是一个黑盒。请建议一个正确的方式等待webflux应用程序/控制器。谢谢
1条答案
按热度按时间gfttwv5a1#
我还没有看到一种真正干净的方法来将异步但非React性的数据放入React性流中,您使用倒计时锁存器的效果和其他方法一样好。
但是,如果使用ReactiveKafka库,则不必执行任何笨拙的阻塞操作。
更新
实际上,我确实找到了一个使用MonoSink的相对优雅的解决方案。
这里假设
subscribe
和sendAuthenticationRequestToKafka
都在作用域中,如果你把它移到服务层,你可以让那个方法返回Mono<String>
。另外,我不知道Kafka是如何处理错误的,但您可以创建错误处理程序,将这些错误转发到接收器。