我们有一个服务,是缓慢的React,平均为2秒,无论我们发送的有效载荷。我们正在批处理多个请求以节省一些带宽,并且不会阻塞该服务。
我身边有个客户 WebClient
收集和执行请求。
public class MyClient {
private final WebClient client;
private final Sinks.Many<String> requestSink;
private final Flux<Map.Entry<String, TrackStatus>> output;
public MyClient(URI uri, int queueSize, Duration timeout) {
this.client = WebClient.create(uri.toString());
this.requestSink = Sinks.many().multicast().onBackpressureBuffer(100, false);
this.output = this.requestSink.asFlux().bufferTimeout(queueSize, timeout)
.publishOn(Schedulers.boundedElastic())
.flatMap(orderNumbers -> this.client.get()
.uri(uriBuilder -> uriBuilder
.path("/orders")
.queryParam("q", orderNumbers)
.build())
.retrieve()
.bodyToMono(Response.class)
.publishOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(3))
.onErrorReturn(this.createEmptyResponse(orderNumbers)))
.flatMap(shipmentsResponse -> Flux.fromIterable(shipmentsResponse.getStatuses().entrySet()));
}
private Response createEmptyResponse(List<String> orderNumbers) {
var response = new Response();
for (String orderNumber : orderNumbers) {
response.addStatus(orderNumber, null);
}
return response;
}
public void addOrderNumber(String orderNumber) {
var emitResult = this.requestSink.tryEmitNext(orderNumber);
while (emitResult.isFailure()) {
emitResult = this.requestSink.tryEmitNext(orderNumber);
}
}
public Flux<Map.Entry<String, TrackStatus>> output() {
return this.output;
}
}
public class Response {
private Map<String, Status> statuses = new HashMap<>();
@JsonAnySetter
public void addStatus(String orderNumber, Status status) {
this.statuses.put(orderNumber, status);
}
@JsonAnyGetter
public Map<String, Status> getStatuses() {
return this.statuses;
}
}
public enum Status {
NEW,
IN_TRANSIT,
DELIVERED;
}
我在商业服务中使用这个客户机。
public class Service {
private final MyClient client;
public Service(MyClient client) {
this.client = client;
}
public Mono<Response> getStatus(Set<String> orderNumbers) {
var statuses = this.client.output()
.filter(e -> orderNumbers.contains(e.getKey()))
.take(orderNumbers.size())
.collect(this.nullSafeCollector());
Flux.fromIterable(orderNumbers)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(this.client::addOrderNumber);
return statuses
.publishOn(Schedulers.boundedElastic())
.map(t -> new Response(t));
}
private <K, V> Collector<Map.Entry<K, V>, Map<K, V>, Map<K, V>> nullSafeCollector() {
return Collector.of(HashMap::new
, (m, entry) -> m.put(entry.getKey(), entry.getValue())
, (m1, m2) -> {
m1.putAll(m2);
return m1;
}
, Collector.Characteristics.UNORDERED);
}
}
public final class Response {
private final Map<String, Status> statuses;
public GetAggregationResponse(Map<String, Status> statuses) {
this.statuses = statuses;
}
public Map<String, Status> getStatuses() {
return this.statuses;
}
}
此业务服务从以下控制器使用。
@RestController
@RequestMapping(value = "/collect", produces = APPLICATION_JSON_VALUE)
public class Controller {
private final Service service;
public Controller(Service service) {
this.service = service;
}
@GetMapping()
@ResponseStatus(value = OK)
private Mono<Response> getAggregation(@RequestParam(name = "order") Set<String> orderNumbers) {
return this.service.getStatus(orderNumbers);
}
}
这对于一个请求来说没有任何问题。但是当我同时打多个电话给服务的时候。服务挂起并且不向所有呼叫者返回响应。
我用blockhound运行了这个应用程序,查看是什么阻塞了服务器,但是没有输出。我怎么能理解是什么导致了绞刑?
我正在使用Java15和SpringBoot2.4.0,后者有reactor 3.4.0。
暂无答案!
目前还没有任何答案,快来回答吧!