∮抽象的问题∮
有没有什么方法可以按照外部可观测的原始顺序来消耗mergeMap
的结果,同时仍然允许内部可观测并行运行?
更详细的解释
让我们来看两个合并Map运算符:
...它接受Map回调,以及可以并发运行的内部可观察对象的数量:
of(1, 2, 3, 4, 5, 6).pipe(
mergeMap(number => api.get('/double', { number }), 3)
);
这将分别为1
、2
和3
发出3个并行请求,其中一个请求完成后,它将为4
发出另一个请求,以此类推,始终保持3个并发请求,直到所有值都被处理完。
但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能会乱序。
[2, 4, 6, 8, 10, 12]
......我们实际上可能会得到:
[4, 2, 8, 10, 6, 12] // or any other permutation
...输入concatMap
。此运算符确保所有可观测值都按原始顺序连接,因此:
of(1, 2, 3, 4, 5, 6).pipe(
concatMap(number => api.get('/double', { number }))
);
......将始终产生:
[2, 4, 6, 8, 10, 12]
这正是我们想要的,但是现在 * 请求不会并行运行 *。concatMap
等效于mergeMap
,其中concurrency
参数设置为1
。
- 回到问题上**有没有可能获得
mergeMap
的好处,从而可以并行运行给定数量的请求,同时仍然按照原始顺序发出Map值?
- 回到问题上**有没有可能获得
∮我的具体问题∮
以上是对问题的抽象描述,当你知道手头的实际问题时,有时候推理问题会更容易一些,所以现在开始:
1.我有一个必须发货的订单列表:
const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1.我有一个shipOrder
方法,它实际运送订单,返回一个Promise
:
const shipOrder = orderNumber => api.shipOrder(orderNumber);
- API最多只能同时处理5个订单发货,因此我使用
mergeMap
来处理这个问题:
from(orderNumbers).pipe(
mergeMap(orderNumber => shipOrder(orderNumber), 5)
);
1.订单发货后,我们需要打印发货标签,我有一个printShippingLabel
函数,给定发货订单的订单号,它将打印发货标签,所以我订阅了observable,并在值出现时打印发货标签:
from(orderNumbers)
.pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
.pipe(orderNumber => printShippingLabel(orderNumber));
1.这是可行的,但是现在运输标签打印顺序不对,因为mergeMap
根据shipOrder
完成请求的时间发出值。我想要的是标签以与原始列表相同的顺序打印。
这可能吗?
可视化
请参见此处以查看问题的可视化:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
您可以看到,较早的订单甚至在较晚的订单发运之前就已打印。
5条答案
按热度按时间h4cxqtbf1#
一个复杂的解决方案
1.创建一个自定义运算符,它接受具有索引键(* Typescript parlance * 中的
{ index: number }
)的值,并保留这些值的缓冲区,仅根据它们的index
顺序发出它们。1.将原始列表Map到嵌入了
index
的对象列表中。1.将其传递给我们的自定义
sortByIndex
运算符。1.将值Map回其原始值。
下面是
sortByIndex
的外观:有了
sortByIndex
运算符,我们现在可以完成整个管道:创建
concurrentConcat
运算符实际上,有了这个
sortByIndex
操作符,我们现在可以创建一个通用的concurrentConcat
操作符,它将在内部执行与{ index: number, value: T }
类型之间的转换:然后我们可以使用
concurrentConcat
操作符代替mergeMap
,并且它现在将按照原始顺序发出值:为了解决我最初的订单发货问题:
您可以看到,即使较晚的订单可能在较早的订单之前发货,标签也始终按其原始订单打印。
结论
这个解决方案甚至还不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个如此常见的问题,以至于我觉得必须有一个更简单(内置)的方法来解决这个问题:|
dnph8jn42#
您可以使用此运算符:一个月一个月,一个月一个月。
js5cn81o3#
于2022年4月3日更新
将
concatMap((v) => of(v).pipe(delayWhen(() => pipeNotifier))),
替换为zipWith
伪代码
结果编号
https://youtu.be/NEr6qfPlahY
代码
https://stackblitz.com/edit/js-fpds79
nukf8bse4#
你想要的是:
说明:
管道中的第一个操作符是map,它立即为每个值调用shipOrder(因此后续值可能会启动并行请求)。
第二个运算符concatAll将解析的值按正确的顺序排列。
(我简化了代码;concatAll()等效于concatMap(标识)。)
ipakzgxi5#
使用扫描缓冲结果,并在结果可用时按顺序发出它们。