typescript 将数组的Rxjs流转换为单个值,并在流的末尾重复

6ojccjat  于 2023-01-27  发布在  TypeScript
关注(0)|答案(2)|浏览(96)

我有一个observable,它从API获取一个项目数组(每次32个),并发出一个新的响应,直到没有项目可供获取。
我想一个接一个地处理项目清单说,只要我得到的第一批,直到我与所有项目提取完成。
当我完成了完整的列表,我想无限地重复这个过程。
以下是我目前掌握的情况:

_dataService
      .getItemsObservable()
      .pipe(
        switchMap((items) => {
          const itemList = items.map((i) => i.itemId);
          return of(itemList);
        }),
        concatMap((item) =>
            from(item).pipe(
              concatMap((item) => {
                   // do something here
                }
              )
            )
         ),
         repeat()
      ).subscribe()

我能做些什么呢?现在发生的是它将循环第一批项目并忽略其余的

sd2nnvve

sd2nnvve1#

重播不会再次调用该服务,它将重用原始值。请尝试从行为主题执行switchMap,并在处理完这些值后使其发出。确实不确定为什么要将每个项都转换为concatMap的可观察项。请在发出项后处理这些项。

const { of, BehaviorSubject, switchMap, delay } = rxjs;

const _dataService = {
  getItemsObservable: () => of(Array.from({ length: 32 }, () => Math.random()))
};

const bs$ = new BehaviorSubject();

bs$.pipe(
  delay(1000), // Behavior subject are synchronous will cause a stack overflow 
  switchMap(() => _dataService.getItemsObservable())
).subscribe(values => {
  values.forEach(val => {
    console.log('Doing stuff with ' + val);
  });
  bs$.next();
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
xqk2d5yq

xqk2d5yq2#

我有一个observable,它从API获取一个项目数组(每次32个),并发出一个新的响应,直到没有项目可供获取。
好吧,我猜是_dataService.getItemsObservable()
我想处理这份清单
这意味着什么?如何处理?假设您有一个名为processItemById的函数,它处理一个itemId并返回处理后的项目。
一个接一个,只要我得到第一批,直到我完成了所有项目提取。
听起来像是要把Observable<T[]>转换成Observable<T>。你可以使用mergeMap(不关心顺序)或concatMap(保持顺序)来完成这个操作。因为你只是在扁平化一个内部数组,所以它们在本例中是相同的。

_dataService.getItemsObservable().pipe(
  mergeMap(v => v),
  map(item => processItemById(item.itemId)),
  // Repeat here doesn't call `getItemsObservable()` again,
  // instead it re-subscribes to the observable that was returned.
  // Hopefully that's what you're counting on. It's not clear to me
  repeat()
).subscribe(processedItemOutput => {
  // Do something with the output?
});

你知道我能做什么吗?
从你的解释和代码来看,你不清楚你要做什么。也许这会有所帮助。
现在,它将循环遍历第一批项并忽略其余项
发生这种情况的原因有很多。

尖端1

RxJS::of中使用高阶Map操作符会有代码味道,只需使用常规Map即可。
例如:

concatMap(v => of(fn(v)))
// or
switchMap(v => of(fn(v)))

等同于:

map(v => fn(v))

尖端2

我不知道这是否对你有帮助,但你可以通过使用延迟算子在每个订阅上生成一个新的可观察值。
例如:

defer(() => _dataService.getItemsObservable()).pipe(
  mergeMap(v => v),
  map(item => processItemById(item.itemId)),
  repeat()
).subscribe(processedItemOutput => {
  // Do something with the output?
});

相关问题