同时调用块服务器

koaltpgm  于 2021-06-30  发布在  Java
关注(0)|答案(0)|浏览(375)

我们有一个服务,是缓慢的React,平均为2秒,无论我们发送的有效载荷。我们正在批处理多个请求以节省一些带宽,并且不会阻塞该服务。
我身边有个客户 WebClient 收集和执行请求。

public class MyClient {
    private final WebClient client;
    private final Sinks.Many<String> requestSink;
    private final Flux<Map.Entry<String, TrackStatus>> output;

    public MyClient(URI uri, int queueSize, Duration timeout) {
        this.client = WebClient.create(uri.toString());
        this.requestSink = Sinks.many().multicast().onBackpressureBuffer(100, false);
        this.output = this.requestSink.asFlux().bufferTimeout(queueSize, timeout)
                .publishOn(Schedulers.boundedElastic())
                .flatMap(orderNumbers -> this.client.get()
                        .uri(uriBuilder -> uriBuilder
                                .path("/orders")
                                .queryParam("q", orderNumbers)
                                .build())
                        .retrieve()
                        .bodyToMono(Response.class)
                        .publishOn(Schedulers.boundedElastic())
                        .timeout(Duration.ofSeconds(3))
                        .onErrorReturn(this.createEmptyResponse(orderNumbers)))
                .flatMap(shipmentsResponse -> Flux.fromIterable(shipmentsResponse.getStatuses().entrySet()));
    }

    private Response createEmptyResponse(List<String> orderNumbers) {
        var response = new Response();

        for (String orderNumber : orderNumbers) {
            response.addStatus(orderNumber, null);
        }

        return response;
    }

    public void addOrderNumber(String orderNumber) {
        var emitResult = this.requestSink.tryEmitNext(orderNumber);

        while (emitResult.isFailure()) {
            emitResult = this.requestSink.tryEmitNext(orderNumber);
        }
    }

    public Flux<Map.Entry<String, TrackStatus>> output() {
        return this.output;
    }
}

public class Response {
    private Map<String, Status> statuses = new HashMap<>();

    @JsonAnySetter
    public void addStatus(String orderNumber, Status status) {
        this.statuses.put(orderNumber, status);
    }

    @JsonAnyGetter
    public Map<String, Status> getStatuses() {
        return this.statuses;
    }
}

public enum Status {
    NEW,
    IN_TRANSIT,
    DELIVERED;
}

我在商业服务中使用这个客户机。

public class Service  {

    private final MyClient client;

    public Service(MyClient client) {
        this.client = client;
    }

    public Mono<Response> getStatus(Set<String> orderNumbers) {
        var statuses = this.client.output()
                .filter(e -> orderNumbers.contains(e.getKey()))
                .take(orderNumbers.size())
                .collect(this.nullSafeCollector());

        Flux.fromIterable(orderNumbers)
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(this.client::addOrderNumber);

        return statuses
                .publishOn(Schedulers.boundedElastic())
                .map(t -> new Response(t));
    }

    private <K, V> Collector<Map.Entry<K, V>, Map<K, V>, Map<K, V>> nullSafeCollector() {
        return Collector.of(HashMap::new
                , (m, entry) -> m.put(entry.getKey(), entry.getValue())
                , (m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                }
                , Collector.Characteristics.UNORDERED);
    }
}

public final class Response {
    private final Map<String, Status> statuses;

    public GetAggregationResponse(Map<String, Status> statuses) {
        this.statuses = statuses;
    }

    public Map<String, Status> getStatuses() {
        return this.statuses;
    }
}

此业务服务从以下控制器使用。

@RestController
@RequestMapping(value = "/collect", produces = APPLICATION_JSON_VALUE)
public class Controller {

    private final Service service;

    public Controller(Service service) {
        this.service = service;
    }

    @GetMapping()
    @ResponseStatus(value = OK)
    private Mono<Response> getAggregation(@RequestParam(name = "order") Set<String> orderNumbers) {
        return this.service.getStatus(orderNumbers);
    }
}

这对于一个请求来说没有任何问题。但是当我同时打多个电话给服务的时候。服务挂起并且不向所有呼叫者返回响应。
我用blockhound运行了这个应用程序,查看是什么阻塞了服务器,但是没有输出。我怎么能理解是什么导致了绞刑?
我正在使用Java15和SpringBoot2.4.0,后者有reactor 3.4.0。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题