我有一个 List<String>
包含url,我想对其中的每个url执行get请求 List
.
这些请求应同时提出。在所有的要求完成后,我想有一个 List<CustomModel>
包含所有反序列化的响应。
所以我创建了一个方法来发出http请求
public Flux<JsonNode> performGetRequest(String url) {
WebClient webClient = WebClient.create(String.format("%s%s", API_BASE_URL, url));
return webClient.get()
.retrieve()
.bodyToFlux(JsonNode.class);
}
上面的方法是这样调用的
public List<CustomModel> fetch(List<String> urls) {
return Flux.fromIterable(urls)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(this::performGetRequest)
.flatMap(jsonNode -> Flux.fromIterable(customDeserialize(jsonNode)))
.sequential()
.collectList()
.flatMapMany(Flux::fromIterable)
.collectList()
.block();
}
对于每个响应,我都使用一个自定义方法来反序列化响应
private List<CustomModel> customDeserialize(final JsonNode jsonNodeResponse) {
List<CustomModel> customModelList = new ArrayList<>();
for (JsonNode block : jsonNodeResponse) {
// deserialize the response, create an instance of CustomModel class
// and add it to customModelList
}
return customModelList;
}
问题是即使我使用 parallel()
方法整个过程可能不是并行运行的。完成的时间表明我做错了什么。
我错过什么了吗?
2条答案
按热度按时间q1qsirdb1#
问题是,即使我使用parallel()方法,整个进程也可能没有并行运行。完成的时间表明我做错了什么。
我错过什么了吗?
既然你打电话来
block
我假设您正在运行一个mvcservlet应用程序,该应用程序正在使用WebClient
只适用于休息电话。如果您没有运行完整的webflux应用程序,那么您的应用程序将启动一个事件循环,该循环将处理计划的所有事件。如果运行一个完整的webflux应用程序,您将得到与运行机器上的内核一样多的事件循环。
通过使用
parallel
React堆文件上说:要获得平行通量,可以使用
parallel()
任何焊剂的操作员。就其本身而言,这种方法并不能使工作并行化。相反,它将工作负载划分为“rails”(默认情况下,rails的数量与cpu内核的数量相同)。为了告诉结果parallelflux在何处运行每一个轨道(扩展来说,并行运行轨道),您必须使用runon(调度器)。请注意,有一个建议用于并行工作的专用调度器:schedulers.parallel()。
您正在创建一个
boundedElastic
未针对并行工作进行优化的调度器。但我想说的是,你在做什么
async i/o
不是parallel
指出这一点非常重要。在并行运行时,很可能不会获得任何性能提升,因为大多数i/o都会发出请求,然后等待响应。ParellelFlux
将确保所有的cpu核心都被使用,但也有一些惩罚。有一个设置时间来确保所有的内核都开始工作,然后需要完成的工作不是cpu密集型的,它们只是发出1000个请求,然后所有的线程都完成了,并且必须等待响应。工人需要在核心上设置,信息需要发送到每个核心,检索等。
parallel
当您有cpu密集型工作(每个事件都需要在多个核上执行大量计算)时,它将获得大部分好处。但对于async
定期工作Flux
很可能就足够了。这是西蒙·巴斯尔说的é 其中一个reactor开发人员必须说明如何在reactor中运行i/o工作,并行还是异步
同样值得一提的是
boundedElastic
调度器被调整为在纯webflux应用程序中作为常规servlet行为的回退来阻止工作。您在servlet应用程序中运行webflux,因此您获得的好处可能不如webflux应用程序充分。
lp0sw83n2#
我不是100%确定这是否是这里的问题,但我注意到当与
WebClient
以及ParallelFlux
,即WebClient
只是归还Publisher
为了回应(bodyToMono
/bodyToFlux
),不适用于实际请求。考虑用
Flux.defer
/Mono.defer
得到一个Publisher
已经申请了,例如: