spring 睡在Sping Boot 里

gdx19jrr  于 2023-09-29  发布在  Spring
关注(0)|答案(2)|浏览(83)

我在Sping Boot Web服务中安排了一个定期重复的任务(@EnableSceduling)。当该任务触发时,它调用注册对象的Runnable/run方法。在run方法中,我需要做工作,并且在工作完成之前不退出run方法。问题是,我有其他线程做其他工作,这是需要这个运行线程的工作。所以在run thread中,我有这样的东西:

@Component
public class DoWork implements Runnable {
    
    @override
    public void run() {
    
    // Setup clients.

    // Call services.
    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback()); 
    
    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback()); 

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
        
    while(callbacksWorkCompletedFlag == false) {
        
            Thread.sleep (1000);
        }

        // Do computation with callback responses.

        // After computation is completed, exit run method.
    }
}
public class MyResponseCallback implements Consumer<String> {
    
    @override
    public void accept (final Sting response) {
    
        // Do work with response.
    }
}
public class MyErrorCallback implements Consumer<Throwable> {
    
    @override
    public void accept (final Throwable error) {
    
        // Log error.
    }
}

在Java/Sping Boot 中有更好的方法吗?

dpiehjr4

dpiehjr41#

下面是一个使用CompletableFuture的示例。它使用Mono.subscribe的第三个参数来让未来知道它何时完成。

@Override
public void run() {
    Mono<String> response1 = client1.post();
    CompletableFuture<?> future1 = new CompletableFuture<>();
    response1.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future1.complete(null));

    Mono<String> response2 = client2.post();
    CompletableFuture<?> future2 = new CompletableFuture<>();
    response2.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future2.complete(null));

    Mono<String> responseX = clientX.post();
    CompletableFuture<?> futureX = new CompletableFuture<>();
    responseX.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> futureX.complete(null));
    
    CompletableFuture.allOf(future1, future2, futureX).join();
}

下面是一个CountDownLatch的例子:

@Override
public void run() {
    CountDownLatch latch = new CountDownLatch(3);

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
    
    try {
        latch.await();
    } catch (InterruptedException ex) {}
}

另一个CompletableFuture示例:

@Override
public void run() {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    Supplier<Runnable> onDone = () -> {
        CompletableFuture<?> future = new CompletableFuture<>();
        futures.add(future);
        return () -> future.complete(null);
    };

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
}

所有的回调都是必需的吗?

@Override
public void run() {
    // Make requests
    Mono<String> responseMono1 = client1.post();
    Mono<String> responseMono2 = client2.post();
    Mono<String> responseMonoX = clientX.post();
    try {
        // Wait for requests to complete
        String response1 = responseMono1.block();
        String response2 = responseMono2.block();
        String responseX = responseMonoX.block();

        ...
    }
    catch (RuntimeException e) {
        ...
    }
}
vh0rcniy

vh0rcniy2#

你正在使用响应式编程,但忘记这一点,并试图解决它势在必行。你不需要睡觉,而是利用Project Reactor的功能。
您可以使用zip将不同Mono的结果组合在一起,然后map将结果转换为您需要的结果。无需检查boolean使用倒计时锁存器等。

Mono<String> response1 = client1.post();   
Mono<String> response2 = client2.post();
Mono<String> responseX = clientX.post();
Mono<String> result = Mono
  .zip(response1, response2, response3)
  .map(//dosomething with the result of the 3 mono's)
  .subscribe();

你在Consumer中做了什么有点不清楚,但是你可以在每个Mono上使用map(或者如果只是设置布尔值,你可以删除它们)。

相关问题