SpringWebFlux执行并行http请求并反序列化响应

r8xiu3jd  于 2021-07-16  发布在  Java
关注(0)|答案(2)|浏览(504)

我有一个 List<String> 包含url,我想对其中的每个url执行get请求 List .
这些请求应同时提出。在所有的要求完成后,我想有一个 List<CustomModel> 包含所有反序列化的响应。
所以我创建了一个方法来发出http请求

public Flux<JsonNode> performGetRequest(String url) {
    WebClient webClient = WebClient.create(String.format("%s%s", API_BASE_URL, url));

    return webClient.get()
            .retrieve()
            .bodyToFlux(JsonNode.class);
}

上面的方法是这样调用的

public List<CustomModel> fetch(List<String> urls) {
    return Flux.fromIterable(urls)
            .parallel()
            .runOn(Schedulers.boundedElastic())
            .flatMap(this::performGetRequest)
            .flatMap(jsonNode -> Flux.fromIterable(customDeserialize(jsonNode)))
            .sequential()
            .collectList()
            .flatMapMany(Flux::fromIterable)
            .collectList()
            .block();
}

对于每个响应,我都使用一个自定义方法来反序列化响应

private List<CustomModel> customDeserialize(final JsonNode jsonNodeResponse) {
    List<CustomModel> customModelList = new ArrayList<>();

    for (JsonNode block : jsonNodeResponse) {
        // deserialize the response, create an instance of CustomModel class 
        // and add it to customModelList
    }

    return customModelList;
}

问题是即使我使用 parallel() 方法整个过程可能不是并行运行的。完成的时间表明我做错了什么。
我错过什么了吗?

q1qsirdb

q1qsirdb1#

问题是,即使我使用parallel()方法,整个进程也可能没有并行运行。完成的时间表明我做错了什么。
我错过什么了吗?
既然你打电话来 block 我假设您正在运行一个mvcservlet应用程序,该应用程序正在使用 WebClient 只适用于休息电话。
如果您没有运行完整的webflux应用程序,那么您的应用程序将启动一个事件循环,该循环将处理计划的所有事件。如果运行一个完整的webflux应用程序,您将得到与运行机器上的内核一样多的事件循环。
通过使用 parallel React堆文件上说:
要获得平行通量,可以使用 parallel() 任何焊剂的操作员。就其本身而言,这种方法并不能使工作并行化。相反,它将工作负载划分为“rails”(默认情况下,rails的数量与cpu内核的数量相同)。
为了告诉结果parallelflux在何处运行每一个轨道(扩展来说,并行运行轨道),您必须使用runon(调度器)。请注意,有一个建议用于并行工作的专用调度器:schedulers.parallel()。
您正在创建一个 boundedElastic 未针对并行工作进行优化的调度器。
但我想说的是,你在做什么 async i/o 不是 parallel 指出这一点非常重要。在并行运行时,很可能不会获得任何性能提升,因为大多数i/o都会发出请求,然后等待响应。 ParellelFlux 将确保所有的cpu核心都被使用,但也有一些惩罚。有一个设置时间来确保所有的内核都开始工作,然后需要完成的工作不是cpu密集型的,它们只是发出1000个请求,然后所有的线程都完成了,并且必须等待响应。
工人需要在核心上设置,信息需要发送到每个核心,检索等。 parallel 当您有cpu密集型工作(每个事件都需要在多个核上执行大量计算)时,它将获得大部分好处。但对于 async 定期工作 Flux 很可能就足够了。
这是西蒙·巴斯尔说的é 其中一个reactor开发人员必须说明如何在reactor中运行i/o工作,并行还是异步
同样值得一提的是 boundedElastic 调度器被调整为在纯webflux应用程序中作为常规servlet行为的回退来阻止工作。
您在servlet应用程序中运行webflux,因此您获得的好处可能不如webflux应用程序充分。

lp0sw83n

lp0sw83n2#

我不是100%确定这是否是这里的问题,但我注意到当与 WebClient 以及 ParallelFlux ,即 WebClient 只是归还 Publisher 为了回应( bodyToMono / bodyToFlux ),不适用于实际请求。
考虑用 Flux.defer / Mono.defer 得到一个 Publisher 已经申请了,例如:

.flatMap(url -> Flux.defer(() -> performGetRequest(url)))

相关问题