我正在开发一个缓冲聚合服务。聚合服务调用3个不同的服务(让我们调用s1,s2和s3)并返回聚合结果。聚合服务本身由客户端通过将用于调用s1、s2和s3的参数来调用。为了避免s1、s2和s3被大量调用淹没,我们对调用进行缓冲。我们积累每个服务的参数,直到我们达到至少5个参数。一旦我们为一个服务积累了5个参数,我们就发出调用并获得结果。只有在调用了3个服务并返回结果时,才会返回聚合结果。
以下是我的React式实现:
@Service
public class MyAggregationService {
@Autowired
private PricingClient pricingClient;
@Autowired
private TrackClient trackClient;
@Autowired
private ShipmentsClient shipmentsClient;
private final Many<List<String>> pricingSink;
private final Many<List<String>> trackSink;
private final Many<List<String>> shipmentsSink;
Flux<Mono<Map<String, Optional<Float>>>> pricingMonoFlux;
Flux<Mono<Map<String, Optional<String>>>> trackMonoFlux;
Flux<Mono<Map<String, Optional<List<String>>>>> shipmentsMonoFlux;
public MyAggregationService() {
pricingSink = Sinks.many().multicast().onBackpressureBuffer();
trackSink = Sinks.many().multicast().onBackpressureBuffer();
shipmentsSink = Sinks.many().multicast().onBackpressureBuffer();
}
@PostConstruct
public void setUp() {
pricingMonoFlux = pricingSink.asFlux().log().flatMapIterable(Function.identity())
.bufferTimeout(5, Duration.of(5, SECONDS))
.publishOn(Schedulers.boundedElastic())
.map(pricingClient::getPricing)
.subscribeOn(Schedulers.boundedElastic());
trackMonoFlux = trackSink.asFlux().log().flatMapIterable(Function.identity())
.bufferTimeout(5, Duration.of(5, SECONDS))
.publishOn(Schedulers.boundedElastic())
.map(trackClient::getTrack)
.subscribeOn(Schedulers.boundedElastic());
shipmentsMonoFlux = shipmentsSink.asFlux().log().flatMapIterable(Function.identity())
.bufferTimeout(5, Duration.of(5, SECONDS))
.publishOn(Schedulers.boundedElastic())
.map(shipmentsClient::getShipment)
.subscribeOn(Schedulers.boundedElastic());
}
public Mono<AggregateResult> aggregate(Optional<List<String>> pricing,
Optional<List<String>> track,
Optional<List<String>> shipments) {
pricing.filter(not(List::isEmpty)).ifPresent(l -> pricingSink.tryEmitNext(l));
track.filter(not(List::isEmpty)).ifPresent(l -> trackSink.tryEmitNext(l));
shipments.filter(not(List::isEmpty)).ifPresent(l -> shipmentsSink.tryEmitNext(l));
pricingMonoFlux.subscribeOn(Schedulers.boundedElastic());
shipmentsMonoFlux.subscribeOn(Schedulers.boundedElastic());
trackMonoFlux.subscribeOn(Schedulers.boundedElastic());
var zipped = Flux.zip(pricingMonoFlux, trackMonoFlux, shipmentsMonoFlux);
Mono<AggregateResult> next = zipped
.flatMap(t -> getZip(t))
.map(t -> new AggregateResult(t.getT1(), t.getT2(), t.getT3()))
.next();
return next;
}
private static Mono<Tuple3<Map<String, Optional<Float>>, Map<String, Optional<String>>, Map<String, Optional<List<String>>>>> getZip(Tuple3<Mono<Map<String, Optional<Float>>>, Mono<Map<String, Optional<String>>>, Mono<Map<String, Optional<List<String>>>>> t) {
return Mono.zip(t.getT1(), t.getT2(), t.getT3());
}
字符串
}
我使用了3个接收器(每个服务的参数各一个),并使用了buffer函数。这里的问题是Flume在一次呼叫后被取消。
解决办法是什么?
1条答案
按热度按时间5cg8jx4n1#
问题是Flips在聚合方法中订阅了
Sinks.Many
Flux,这不是预期的行为,因为您在聚合方法中再次使用subscribeOn(Schedulers.boundedElastic())
,这可能会导致您的Flux
完成并取消您的Sink。subscribeOn
调用最好只在初始化阶段进行,而不是在每次调用聚合方法时进行。你可以尝试低于1字符串
setUp
方法直接订阅接收器。在调用每个服务之前,许多Flux将传入元素缓冲到大小为5的列表中。aggregate方法只会将新参数推送到每个Sink中,而不会再次订阅它们。另外,这里使用的buffer(5)
方法只有在缓冲区中累积了5个项时才会发出项。如果系统可以容忍一些延迟,您可以考虑使用bufferTimeout(5, Duration.of(5, SECONDS))
来发出项目,即使它们在提供的持续时间内没有达到缓冲区中的5个项目。Mono.zip
将合并每个Mono中最近发布的元素。如果对这些方法的调用并不总是以相同的顺序完成,那么它的行为可能不会像您期望的那样。这将解决在一次调用之后取消接收器的问题,因为接收器现在仅订阅一次,并且在第一次调用之后不会取消订阅。