java 缓冲和响应式聚合服务

wqsoz72f  于 2023-08-01  发布在  Java
关注(0)|答案(1)|浏览(74)

我正在开发一个缓冲聚合服务。聚合服务调用3个不同的服务(让我们调用s1,s2和s3)并返回聚合结果。聚合服务本身由客户端通过将用于调用s1、s2和s3的参数来调用。为了避免s1、s2和s3被大量调用淹没,我们对调用进行缓冲。我们积累每个服务的参数,直到我们达到至少5个参数。一旦我们为一个服务积累了5个参数,我们就发出调用并获得结果。只有在调用了3个服务并返回结果时,才会返回聚合结果。
以下是我的React式实现:

@Service
public class MyAggregationService {

    @Autowired
    private PricingClient pricingClient;
    @Autowired
    private TrackClient trackClient;
    @Autowired
    private ShipmentsClient shipmentsClient;

    private final Many<List<String>> pricingSink;
    private final Many<List<String>> trackSink;
    private final Many<List<String>> shipmentsSink;
    Flux<Mono<Map<String, Optional<Float>>>> pricingMonoFlux;
    Flux<Mono<Map<String, Optional<String>>>> trackMonoFlux;
    Flux<Mono<Map<String, Optional<List<String>>>>> shipmentsMonoFlux;

    public MyAggregationService() {
        pricingSink = Sinks.many().multicast().onBackpressureBuffer();
        trackSink = Sinks.many().multicast().onBackpressureBuffer();
        shipmentsSink = Sinks.many().multicast().onBackpressureBuffer();

    }

    @PostConstruct
    public void setUp() {
        pricingMonoFlux = pricingSink.asFlux().log().flatMapIterable(Function.identity())
                .bufferTimeout(5, Duration.of(5, SECONDS))
                .publishOn(Schedulers.boundedElastic())
                .map(pricingClient::getPricing)
                .subscribeOn(Schedulers.boundedElastic());

        trackMonoFlux = trackSink.asFlux().log().flatMapIterable(Function.identity())
                .bufferTimeout(5, Duration.of(5, SECONDS))
                .publishOn(Schedulers.boundedElastic())
                .map(trackClient::getTrack)
                .subscribeOn(Schedulers.boundedElastic());

        shipmentsMonoFlux = shipmentsSink.asFlux().log().flatMapIterable(Function.identity())
                .bufferTimeout(5, Duration.of(5, SECONDS))
                .publishOn(Schedulers.boundedElastic())
                .map(shipmentsClient::getShipment)
                .subscribeOn(Schedulers.boundedElastic());
    }

    public Mono<AggregateResult> aggregate(Optional<List<String>> pricing,
                                           Optional<List<String>> track,
                                           Optional<List<String>> shipments) {

        pricing.filter(not(List::isEmpty)).ifPresent(l -> pricingSink.tryEmitNext(l));
        track.filter(not(List::isEmpty)).ifPresent(l -> trackSink.tryEmitNext(l));
        shipments.filter(not(List::isEmpty)).ifPresent(l -> shipmentsSink.tryEmitNext(l));

        pricingMonoFlux.subscribeOn(Schedulers.boundedElastic());
        shipmentsMonoFlux.subscribeOn(Schedulers.boundedElastic());
        trackMonoFlux.subscribeOn(Schedulers.boundedElastic());

        var zipped = Flux.zip(pricingMonoFlux, trackMonoFlux, shipmentsMonoFlux);
        Mono<AggregateResult> next = zipped
                .flatMap(t -> getZip(t))
                .map(t -> new AggregateResult(t.getT1(), t.getT2(), t.getT3()))
                .next();
        return next;
    }

    private static Mono<Tuple3<Map<String, Optional<Float>>, Map<String, Optional<String>>, Map<String, Optional<List<String>>>>> getZip(Tuple3<Mono<Map<String, Optional<Float>>>, Mono<Map<String, Optional<String>>>, Mono<Map<String, Optional<List<String>>>>> t) {
        return Mono.zip(t.getT1(), t.getT2(), t.getT3());
    }

字符串
}
我使用了3个接收器(每个服务的参数各一个),并使用了buffer函数。这里的问题是Flume在一次呼叫后被取消。
解决办法是什么?

5cg8jx4n

5cg8jx4n1#

问题是Flips在聚合方法中订阅了Sinks.Many Flux,这不是预期的行为,因为您在聚合方法中再次使用subscribeOn(Schedulers.boundedElastic()),这可能会导致您的Flux完成并取消您的Sink。subscribeOn调用最好只在初始化阶段进行,而不是在每次调用聚合方法时进行。你可以尝试低于1

@Service
public class MyAggregationService {

    @Autowired
    private PricingClient pricingClient;
    @Autowired
    private TrackClient trackClient;
    @Autowired
    private ShipmentsClient shipmentsClient;

    private final Sinks.Many<List<String>> pricingSink = Sinks.many().replay().all();
    private final Sinks.Many<List<String>> trackSink = Sinks.many().replay().all();
    private final Sinks.Many<List<String>> shipmentsSink = Sinks.many().replay().all();

    private final Flux<List<Mono<Map<String, Optional<Float>>>>> pricingFlux;
    private final Flux<List<Mono<Map<String, Optional<String>>>>> trackFlux;
    private final Flux<List<Mono<Map<String, Optional<List<String>>>>> shipmentsFlux;

    public MyAggregationService() {
        pricingFlux = pricingSink.asFlux()
                .flatMapIterable(Function.identity())
                .buffer(5)
                .map(pricingList -> pricingClient.getPricing(pricingList));

        trackFlux = trackSink.asFlux()
                .flatMapIterable(Function.identity())
                .buffer(5)
                .map(trackList -> trackClient.getTrack(trackList));

        shipmentsFlux = shipmentsSink.asFlux()
                .flatMapIterable(Function.identity())
                .buffer(5)
                .map(shipmentsList -> shipmentsClient.getShipment(shipmentsList));
    }

    public Flux<AggregateResult> aggregate(Optional<List<String>> pricing,
                                           Optional<List<String>> track,
                                           Optional<List<String>> shipments) {

        pricing.filter(not(List::isEmpty)).ifPresent(l -> pricingSink.tryEmitNext(l));
        track.filter(not(List::isEmpty)).ifPresent(l -> trackSink.tryEmitNext(l));
        shipments.filter(not(List::isEmpty)).ifPresent(l -> shipmentsSink.tryEmitNext(l));

        return Flux.zip(pricingFlux, trackFlux, shipmentsFlux)
                .flatMap(tuple -> Mono.zip(tuple.getT1(), tuple.getT2(), tuple.getT3())
                        .map(t -> new AggregateResult(t.getT1(), t.getT2(), t.getT3())));
    }
}

字符串
setUp方法直接订阅接收器。在调用每个服务之前,许多Flux将传入元素缓冲到大小为5的列表中。aggregate方法只会将新参数推送到每个Sink中,而不会再次订阅它们。另外,这里使用的buffer(5)方法只有在缓冲区中累积了5个项时才会发出项。如果系统可以容忍一些延迟,您可以考虑使用bufferTimeout(5, Duration.of(5, SECONDS))来发出项目,即使它们在提供的持续时间内没有达到缓冲区中的5个项目。Mono.zip将合并每个Mono中最近发布的元素。如果对这些方法的调用并不总是以相同的顺序完成,那么它的行为可能不会像您期望的那样。
这将解决在一次调用之后取消接收器的问题,因为接收器现在仅订阅一次,并且在第一次调用之后不会取消订阅。

相关问题