我有一个Asyn调用节俭接口:
public CompletableFuture<List<Long>> getFavourites(Long userId){
CompletableFuture<List<Long>> future = new CompletableFuture();
OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
callback.addObserver(new OctoObserver() {
@Override
public void onSuccess(Object o) {
future.complete((List<Long>) o);
}
@Override
public void onFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
try {
recommendAsyncService.getFavorites(userId, callback);
} catch (TException e) {
log.error("OctoCall RecommendAsyncService.getFavorites", e);
}
return future;
}
现在它返回一个CompletableFuture。然后我用Flux调用它来做一些处理器。
public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
// do not like it
List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);
System.out.println(recommendList);
return Flux.fromIterable(recommendList)
.flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
.userId(userId)
.productId(id)
.productType((int) (Math.random()*100))
.build())))
.take(5)
.publishOn(mdpScheduler);
}
但是,我想从getFavourites
方法中获取Flux,并且可以在getRecommend
方法中使用它。
或者,您可以推荐一个Flux API
,我可以将List<Long> recommendList
转换为Flux<Long> recommendFlux
。
2条答案
按热度按时间mkh04yzy1#
要将
CompletableFuture<List<T>>
转换为Flux<T>
,您可以将Mono#fromFuture
与Mono#flatMapMany
一起使用:在回调中异步接收的
List<T>
也可以转换为Flux<T>
,而不使用CompletableFuture
。您可以直接将Mono#create
与Mono#flatMapMany
一起使用:或者简单地使用
Flux#create
在一次通过中进行多个发射:67up9zun2#
简单的解决方案是使用
Flux.fromIterable
,如下例所示Springboot版本3.1.0
注意:不建议这样做,因为当你在React管道中工作时,它不会完全React,这里是创建一个List。