javascript RxJS mergeMap()与原始顺序

rks48beu  于 2023-02-15  发布在  Java
关注(0)|答案(5)|浏览(100)

∮抽象的问题∮
有没有什么方法可以按照外部可观测的原始顺序来消耗mergeMap的结果,同时仍然允许内部可观测并行运行?

更详细的解释

让我们来看两个合并Map运算符:

...它接受Map回调,以及可以并发运行的内部可观察对象的数量:

of(1, 2, 3, 4, 5, 6).pipe(
      mergeMap(number => api.get('/double', { number }), 3)
  );

这将分别为123发出3个并行请求,其中一个请求完成后,它将为4发出另一个请求,以此类推,始终保持3个并发请求,直到所有值都被处理完。
但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能会乱序。

[2, 4, 6, 8, 10, 12]

......我们实际上可能会得到:

[4, 2, 8, 10, 6, 12] // or any other permutation

...输入concatMap。此运算符确保所有可观测值都按原始顺序连接,因此:

of(1, 2, 3, 4, 5, 6).pipe(
      concatMap(number => api.get('/double', { number }))
  );

......将始终产生:

[2, 4, 6, 8, 10, 12]

这正是我们想要的,但是现在 * 请求不会并行运行 *。
concatMap等效于mergeMap,其中concurrency参数设置为1

    • 回到问题上**有没有可能获得mergeMap的好处,从而可以并行运行给定数量的请求,同时仍然按照原始顺序发出Map值?

∮我的具体问题∮
以上是对问题的抽象描述,当你知道手头的实际问题时,有时候推理问题会更容易一些,所以现在开始:
1.我有一个必须发货的订单列表:

const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

1.我有一个shipOrder方法,它实际运送订单,返回一个Promise

const shipOrder = orderNumber => api.shipOrder(orderNumber);
  1. API最多只能同时处理5个订单发货,因此我使用mergeMap来处理这个问题:
from(orderNumbers).pipe(
     mergeMap(orderNumber => shipOrder(orderNumber), 5)
 );

1.订单发货后,我们需要打印发货标签,我有一个printShippingLabel函数,给定发货订单的订单号,它将打印发货标签,所以我订阅了observable,并在值出现时打印发货标签:

from(orderNumbers)
     .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
     .pipe(orderNumber => printShippingLabel(orderNumber));

1.这是可行的,但是现在运输标签打印顺序不对,因为mergeMap根据shipOrder完成请求的时间发出值。我想要的是标签以与原始列表相同的顺序打印
这可能吗?

可视化

请参见此处以查看问题的可视化:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
您可以看到,较早的订单甚至在较晚的订单发运之前就已打印。

h4cxqtbf

h4cxqtbf1#

  • 我确实解决了部分问题,所以我把它贴在这里,作为我自己问题的答案。*
  • 我仍然非常想知道处理这种情况的规范方法 *

一个复杂的解决方案

1.创建一个自定义运算符,它接受具有索引键(* Typescript parlance * 中的{ index: number })的值,并保留这些值的缓冲区,仅根据它们的index顺序发出它们。
1.将原始列表Map到嵌入了index的对象列表中。
1.将其传递给我们的自定义sortByIndex运算符。
1.将值Map回其原始值。
下面是sortByIndex的外观:

function sortByIndex() {
    return observable => {
        return Observable.create(subscriber => {
            const buffer = new Map();
            let current = 0;
            return observable.subscribe({
                next: value => {
                    if (current != value.index) {
                        buffer.set(value.index, value);
                    } else {
                        subscriber.next(value);
                    
                        while (buffer.has(++current)) {
                            subscriber.next(buffer.get(current));
                            buffer.delete(current);
                        }
                    }
                },
                complete: value => subscriber.complete(),
            });
        });
    };
}

有了sortByIndex运算符,我们现在可以完成整个管道:

of(1, 2, 3, 4, 5, 6).pipe(
    map((number, index) => ({ number, index })),
    mergeMap(async ({ number, index }) => {
        const doubled = await api.get('/double', { number });
        return { index, number: doubled };
    }, 3),
    sortByIndex(),
    map(({ number }) => number)
);

创建concurrentConcat运算符

实际上,有了这个sortByIndex操作符,我们现在可以创建一个通用的concurrentConcat操作符,它将在内部执行与{ index: number, value: T }类型之间的转换:

function concurrentConcat(mapper, parallel) {
    return observable => {
        return observable.pipe(
            mergeMap(
                mapper,
                (_, value, index) => ({ value, index }),
                parallel
            ),
            sortByIndex(),
            map(({ value }) => value)
        );
    };
}

然后我们可以使用concurrentConcat操作符代替mergeMap,并且它现在将按照原始顺序发出值:

of(1, 2, 3, 4, 5, 6).pipe(
    concurrentConcat(number => api.get('/double', { number }), 3),
);

为了解决我最初的订单发货问题:

from(orderNumbers)
    .pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
    .subscribe(orderNumber => printShippingLabel(orderNumber));

您可以看到,即使较晚的订单可能在较早的订单之前发货,标签也始终按其原始订单打印。

结论

这个解决方案甚至还不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个如此常见的问题,以至于我觉得必须有一个更简单(内置)的方法来解决这个问题:|

dnph8jn4

dnph8jn42#

您可以使用此运算符:一个月一个月,一个月一个月。

const DONE = Symbol("DONE");
const DONE$ = of(DONE);
const sortedMergeMap = <I, O>(
  mapper: (i: I) => ObservableInput<O>,
  concurrent = 1
) => (source$: Observable<I>) =>
  source$.pipe(
    mergeMap(
      (value, idx) =>
        concat(mapper(value), DONE$).pipe(map(x => [x, idx] as const)),
      concurrent
    ),
    scan(
      (acc, [value, idx]) => {
        if (idx === acc.currentIdx) {
          if (value === DONE) {
            let currentIdx = idx;
            const valuesToEmit = [];
            do {
              currentIdx++;
              const nextValues = acc.buffer.get(currentIdx);
              if (!nextValues) {
                break;
              }
              valuesToEmit.push(...nextValues);
              acc.buffer.delete(currentIdx);
            } while (valuesToEmit[valuesToEmit.length - 1] === DONE);
            return {
              ...acc,
              currentIdx,
              valuesToEmit: valuesToEmit.filter(x => x !== DONE) as O[]
            };
          } else {
            return {
              ...acc,
              valuesToEmit: [value]
            };
          }
        } else {
          if (!acc.buffer.has(idx)) {
            acc.buffer.set(idx, []);
          }
          acc.buffer.get(idx)!.push(value);
          if (acc.valuesToEmit.length > 0) {
            acc.valuesToEmit = [];
          }
          return acc;
        }
      },
      {
        currentIdx: 0,
        valuesToEmit: [] as O[],
        buffer: new Map<number, (O | typeof DONE)[]>([[0, []]])
      }
    ),
    mergeMap(scannedValues => scannedValues.valuesToEmit)
  );
js5cn81o

js5cn81o3#

于2022年4月3日更新

concatMap((v) => of(v).pipe(delayWhen(() => pipeNotifier))),替换为zipWith

伪代码

zipWith(subject),
map(...do async stuff...),
concatAll()

结果编号
https://youtu.be/NEr6qfPlahY

request 1
request 2
request 3
response 3
request 4
response 1
request 5
1
response 4
request 6
response 2
request 7
2
3
4
response 6
request 8
response 5
request 9
5
6
response 7
request 10
7
response 9
response 10
response 8
8
9
10

代码

https://stackblitz.com/edit/js-fpds79

import { range, Subject, from, zipWith } from 'rxjs';
import { share, map, concatAll } from 'rxjs/operators';

const pipeNotifier = new Subject();

range(1, 10)
  .pipe(
    // 1. Make Observable controlled by pipeNotifier
    zipWith(pipeNotifier),
    // 2. Submit the request
    map(([v]) =>
      from(
        (async () => {
          console.log('request', v);
          await wait();
          console.log('response', v);

          pipeNotifier.next();

          return v;
        })()
      )
    ),
    // 3. Keep order
    concatAll()
  )
  .subscribe((x) => console.log(x));

// pipeNotifier controler
range(0, 3).subscribe(() => {
  pipeNotifier.next();
});

function wait() {
  return new Promise((resolve) => {
    const random = 5000 * Math.random();
    setTimeout(() => resolve(random), random);
  });
}
nukf8bse

nukf8bse4#

你想要的是:

from(orderNumbers)
  .pipe(map(shipOrder), concatAll())
  .subscribe(printShippingLabel)

说明:

管道中的第一个操作符是map,它立即为每个值调用shipOrder(因此后续值可能会启动并行请求)。
第二个运算符concatAll将解析的值按正确的顺序排列。
(我简化了代码;concatAll()等效于concatMap(标识)。)

ipakzgxi

ipakzgxi5#

使用扫描缓冲结果,并在结果可用时按顺序发出它们。

const { delay, map, mergeMap, of, scan, switchMap } = rxjs;

const api = {
  get: (url, data) => of(data.number * 2).pipe(delay(Math.random() * 500))
};

of(1, 2, 3, 4, 5, 6).pipe(
  mergeMap((number, index) => api.get('/double', { number }).pipe(
    map(response => ({ response, index })) // We need to keep the index for ordering
  ), 3),
  scan(
    (acc, response) => {
      acc.emit = [];
      let index = acc.responses.findIndex(r => r.index > response.index);
      if (index === -1) {
        index = acc.responses.length;
      }
      acc.responses.splice(index, 0, response);
      while (acc.current === acc.responses[0]?.index) {
        acc.emit.push(acc.responses.shift());
        acc.current++;
      }
      return acc;
    },
    { current: 0, emit: [], responses: [] }
  ),
  switchMap(acc => of(...acc.emit.map(r => r.response)))
).subscribe(response => {
  console.log(response);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>

相关问题