有没有办法在JavaSpring中合并列表中不可预知的monos数量?

7fhtutme  于 2021-06-30  发布在  Java
关注(0)|答案(1)|浏览(286)

我有一个mono<list<(一些自制的对象)>>方法,它接受一个urn列表的单个参数。

public Mono <List <MyObject>> loadMyObjectsFromUrnAsync (Iterable <String> urns) {...}

根据URN的数量,我必须发出许多异步http请求,每个请求都由一个mono表示。由于我希望程序在运行这个方法并执行所有请求时能够移动到程序的其他部分,我该如何压缩各种mono并返回单个mono的结果?

... {
   List <Mono <MyObject>> myList = new Arraylist <Mono <MyObject>> ();
   while (urns.hasNext()) {
      // create a Mono HTTP request with the current iterated URN 
      myList.add(request); }
   return mono.zip(myList);
   }...

上面的代码显然不起作用,它只是希望让我的问题更容易理解。我在网上找不到任何关于如何在不指定每个mono的情况下压缩mono列表的文档

moiiocjp

moiiocjp1#

上面的问题可能没有被清楚地表达出来,所以我将重新措辞,然后描述我的解决方案。希望这能帮上忙!
还有一个重要的注意事项:这个解决方案是在JavaSpring中使用webclient执行对远程服务器的异步客户端请求。
根据URN的数量,我必须发出许多异步http请求,每个请求都由一个mono表示。
假设有一个函数接收http端点的arraylist并返回myobject类型的arraylist。我想让这个函数获取这些端点,并使用monos执行相应数量的http请求,monos将填充该数量的myobject pojo(普通的旧java对象),但是,由于monos的数量会因端点的数量而异,因此我无法显式定义每个mono。我使用monos允许我并行地执行这些请求,或者同时执行,而不是一个接一个地依次执行,并且我希望在所有monos都完成并且可以生成myobject的示例时得到通知。
由于我希望程序能够在运行此方法并执行所有请求时移到程序的其他部分,因此如何压缩各种monos并在完成时返回单个合并流的结果?
组织一组不可预测的monos的解决方案是存储一个monos的iterable,并将其合并到一个flux中,然后缓冲和阻塞该flux以生成java对象!
(理解monos和fluxes对于完全理解这个解决方案很重要)

public Mono<MyObject> getOneObject(String endpoint) {
    return webClient.get()
        .uri(endpoint)
        .retrieve()
        .bodyToMono(MyObject.class)
}

上面的函数基于一个端点检索一个对象。现在,让我们基于给定的端点列表创建monos的iterable。

public Iterable<Mono<MyObject>> getMultipleObjects(ArrayList<String> endpoints) {
    ArrayList<Mono<MyObject>> monos = new ArrayList<Mono<MyObject>> ();
    for (String endpoint: endpoints) {
         monos.add(getOneObject(endpoint));
    }
    return monos;
}

上面的函数返回一个arraylist(在return语句中转换为iterable),该列表通过遍历端点列表来填充。现在,让我们把monos的iterable合并成一个奇异通量!

public Flux<MyObject> mergeMultipleMonos(Iterable<Mono<MyObject>> multipleMonos) {
    return Flux.merge(multipleMonos);
}

现在,您所要做的就是缓冲和阻止通量以生成对象的arraylist!

public ArrayList<MyObject> listOfObjects(Flux<MyObject> fluxOfObjects) {
    return fluxOfObjects.buffer().blockLast();
}

flux.buffer()将把所有传入的值收集到一个列表中,flux.blocklast()将订阅这个流并无限期地阻塞,直到上游发送最后一个值或完成。
这就是为什么不可预测的monos数量(在本例中代表异步客户端httpweb请求)可以合并成一个单一的流aka flux,当每个请求都返回时,它将填充所有的值!

相关问题