java 在Spring Webflux中等待的正确方法

vqlkdk9b  于 2023-01-01  发布在  Java
关注(0)|答案(1)|浏览(178)

我们有一个HTTP端点,它接受一个命令并通过Kafka将其发送到另一个地方,然后我们必须等待通过kafka的响应,然后将ResponseEntity返回给客户端。

public Mono<ResponseEntity<byte[]>> post(@RequestBody Mono<byte[]> b) {
    return b.flatMap(
            identifyType -> service.requestProcess(identifyType).flatMap(this::toResponse));
  }

在服务中:

public Mono<String> requestProcess(byte[] b) {
    String r = authenticationService.waitForAuthentication(parseBytes(b));
    if (StringUtils.isEmpty(r)) {
      throw new CustomException("Failed to authenticate");
    }
    return Mono.just(r);
}

在身份验证服务中

public String waitForAuthentication(String data) {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    Listener listener = new Listener(countDownLatch);
    //another setup
    subscribe("topic_abc", String.class, listener);
    sendAuthenticationRequestToKafka(data);
    try {
      boolean b = countDownLatch.await(2, TimeUnit.SECONDS);
      if(b) return listerner.getData();
      else throw new CustomExcetion();
    catch(Exception e) {
      throw new CustomExcetion();
    }
}

听者

@RequiredArgsConstructor
public class Listener implements Consumer<String> {
  @Getter private final CountDownLatch countDownLatch;
  @Getter private String data;

  @Override
  public void accept(String data) {
    this.data = data;
    // count to 0 
    countDownLatch.countDown();
  }
}

我读了文档,所有的文档都说我们不应该在webflux应用程序上休眠/阻塞/等待,因为它会阻塞当前线程,线程无法处理另一个HTTP请求。
但在我们的情况下,等待是必须的。我们不能改变上面的流程,因为客户端是一个黑盒。请建议一个正确的方式等待webflux应用程序/控制器。谢谢

gfttwv5a

gfttwv5a1#

我还没有看到一种真正干净的方法来将异步但非React性的数据放入React性流中,您使用倒计时锁存器的效果和其他方法一样好。
但是,如果使用ReactiveKafka库,则不必执行任何笨拙的阻塞操作。

更新

实际上,我确实找到了一个使用MonoSink的相对优雅的解决方案。

public Mono<ResponseEntity<byte[]>> post(@RequestBody Mono<byte[]> b) {
    return b.map(String::new)
        .flatMap(data -> Mono.<String>create(sink -> {
          Consumer<String> kafkaListener = sink::success;

          subscribe("topic_abc", String.class, kafkaListener);
          sendAuthenticationRequestToKafka(data);
        })
        .map(this::toResponse);
}

这里假设subscribesendAuthenticationRequestToKafka都在作用域中,如果你把它移到服务层,你可以让那个方法返回Mono<String>
另外,我不知道Kafka是如何处理错误的,但您可以创建错误处理程序,将这些错误转发到接收器。

相关问题