spring Sping Boot 中的动态循环restTemplate,带有URL列表

hvvq6cgz  于 2023-04-28  发布在  Spring
关注(0)|答案(2)|浏览(166)

我有一个URL列表。..列表的大小每次都改变。现在我需要下面的代码转换为动态。..我想使用动态循环并将CountDownLatch设置为动态,以确保所有的URL都有答案,如果可能的话。

public class ApiCallerService {
    
    private RestTemplate restTemplate;
    
    public ApiCallerService(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    public Map<String, String> callApis() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        String response1 = "";
        String response2 = "";
        String response3 = "";

        String url1 = "http://....";
        String url2 = "http://....";
        String url3 = "http://....";

        new Thread(() -> {
            response1 = restTemplate.getForObject(url1, String.class);
            latch.countDown();
        }).start();

        new Thread(() -> {
            response2 = restTemplate.getForObject(url2, String.class);
            latch.countDown();
        }).start();

        new Thread(() -> {
            response3 = restTemplate.getForObject(url3, String.class);
            latch.countDown();
        }).start();

        latch.await();

        Map<String, String> map = new HashMap<>();
        map.put(url1, response1);
        map.put(url2, response2);
        map.put(url3, response3);

        return map;
    }

}

谢谢

bxjv4tth

bxjv4tth1#

当你使用Sping Boot 时,利用Spring Task抽象来异步提交作业并获取结果。Sping Boot 默认创建一个TaskExecutor(可通过spring.task命名空间配置),您可以使用它。

public class ApiCallerService {
    
    private final RestTemplate restTemplate;
    private final AsyncTaskExecutor taskExecutor;

    public ApiCallerService(RestTemplate restTemplate, AsyncTaskExecutor taskExecutor) {
        this.restTemplate = restTemplate;
        this.taskExecutor = taskExecutor;
    }

    public Map<String, String> callApis() throws InterruptedException {
       
       List<String> urls = ... // List of urls;
       List<CompletableFuture> futures = new ArrayList();
       for (String url : urls) {
         futures.add(taskExecutor.submitCompletable(() -> getResult(url))
       }
       return futures.stream()
                .map( (cf) -> get(cf))
                .collect(Collectors.toMap(Result::url, Result::response);
                
    }

    private Result get(CompletableFuture<Result> cf) {
      try {
        return cf.get();
      } catch (Exception ex) {
        throw new IllegalStateException(ex);
      }
    }

    private String getResult(String url) {
        String response = restTemplate.getForObject(url, String.class);
        return new Result(url, response);
    }
}

public record Result(String url, String response) {}

另一种选择是使用WebClient,默认情况下它是被动的(因此是异步的),并且也允许收集结果。

public class ApiCallerService {
    
    private final WebClient webClient;

    public ApiCallerService(WebClient webClient) {
        this.webClient = webClient;
    }

    public Map<String, String> callApis() throws InterruptedException {
       
       List<String> urls = ... // List of urls;
       return Flux.fromIterable(urls)
                .flatMap( this::getResult)
                .collectMap(Result::url, Result::response)
                .block();                
    }

    private Mono<Result> getResult(String url) {
        Mono<String> response = webClient.get().uri(url)
                                         .retrieve()
                                         .bodyToMono(String.class);
        return response.map( (res) -> new Result(url, res));
    }
}

public record Result(String url, String response) {}

无论哪种方式都不要创建自己的Thread示例,使用池,就好像你不断创建线程,最终你会耗尽资源。

ddarikpa

ddarikpa2#

您可以使用Java Streams进行动态处理,如下所示

String[] urls = {"http://....", "http://....", "http://...."};
    int noOfThreads = 3; //This can be urls.length also
    ForkJoinPool forkJoinPool = new ForkJoinPool(noOfThreads);
    Map<String, String> map = forkJoinPool.submit(() -> Arrays.stream(urls).parallel().map(url -> {
        String response = restTemplate.getForObject(url, String.class);
        return Pair.of(url, response);
    }).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond))).get();
    forkJoinPool.shutdown();

相关问题