我有一个observable,它从API获取一个项目数组(每次32个),并发出一个新的响应,直到没有项目可供获取。
我想一个接一个地处理项目清单说,只要我得到的第一批,直到我与所有项目提取完成。
当我完成了完整的列表,我想无限地重复这个过程。
以下是我目前掌握的情况:
_dataService
.getItemsObservable()
.pipe(
switchMap((items) => {
const itemList = items.map((i) => i.itemId);
return of(itemList);
}),
concatMap((item) =>
from(item).pipe(
concatMap((item) => {
// do something here
}
)
)
),
repeat()
).subscribe()
我能做些什么呢?现在发生的是它将循环第一批项目并忽略其余的
2条答案
按热度按时间sd2nnvve1#
重播不会再次调用该服务,它将重用原始值。请尝试从行为主题执行switchMap,并在处理完这些值后使其发出。确实不确定为什么要将每个项都转换为concatMap的可观察项。请在发出项后处理这些项。
xqk2d5yq2#
我有一个observable,它从API获取一个项目数组(每次32个),并发出一个新的响应,直到没有项目可供获取。
好吧,我猜是
_dataService.getItemsObservable()
?我想处理这份清单
这意味着什么?如何处理?假设您有一个名为
processItemById
的函数,它处理一个itemId并返回处理后的项目。一个接一个,只要我得到第一批,直到我完成了所有项目提取。
听起来像是要把
Observable<T[]>
转换成Observable<T>
。你可以使用mergeMap(不关心顺序)或concatMap(保持顺序)来完成这个操作。因为你只是在扁平化一个内部数组,所以它们在本例中是相同的。你知道我能做什么吗?
从你的解释和代码来看,你不清楚你要做什么。也许这会有所帮助。
现在,它将循环遍历第一批项并忽略其余项
发生这种情况的原因有很多。
尖端1
在
RxJS::of
中使用高阶Map操作符会有代码味道,只需使用常规Map即可。例如:
等同于:
尖端2
我不知道这是否对你有帮助,但你可以通过使用延迟算子在每个订阅上生成一个新的可观察值。
例如: