我尝试在两个Observable上使用forkJoin
。其中一个以流的形式开始...如果我直接订阅它们,我会得到一个响应,但forkJoin
没有触发。有什么想法吗?
private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();
....
this.data$ = this.queryStream
.startWith('')
.flatMap(queryInput => {
this.query = queryInput
return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
})
.share();
...
Observable.forkJoin(this.statuses$, this.companies$)
.subscribe(res => {
console.log('forkjoin');
this._countStatus(res[0], res[1]);
});
// This shows arrays in the console...
this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));
// In the console
Array[9]
Array[6]
7条答案
按热度按时间piwo6bdm1#
forkJoin
仅在所有内部观测值完成时发射。如果您需要forkJoin
的等价物,仅侦听来自每个源的单个发射,请使用combineLatest
+take(1)
一旦两个源都发出,
combineLatest
将发出,take(1)
将立即取消订阅。ikfrs5lh2#
forkJoin
的一个非常常见的问题是,它要求所有源Observable至少发出一个项目,并且所有项目都必须完成。换句话说,如果
this.statuses$
或this.companies$
不发出任何项,并且直到它们都完成,forkJoin
才发出任何项。qrjkbowd3#
forkJoin
不工作,所以我使用下面的代码来解决我的问题。使用mergeMap
,您可以将外部订阅的结果Map到内部订阅,并根据需要订阅它。kxkpmulp4#
对我来说,
combineLatest
运算符就是解决方案!cyej8jka5#
将
.pipe(take(1))
作为类似于asObservable()
的可观察对象的管道追加即可完成这项工作。bvuwiixz6#
wfsdck307#
正如在其他回复中提到的,forkJoin似乎不起作用,因为它没有与subscribe相同的功能。
Subscribe会在接收到next或completed事件时触发,但forkJoin只有在所有observable都completed时才会触发。
与combineLatest的区别在于,如果你的Observable发送所有next或completed事件,它将被触发,而forkJoin只会在你的Observable发送所有completed事件时触发。
因此,如果事件发送next事件,则代码将不起作用,但以下代码应该起作用:
你可以在rxjs的文档中看到,在你的例子中,combineLastest是最好的答案:https://www.learnrxjs.io/learn-rxjs/operators/combination/forkjoin
要合并的单据最新https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest
可观察文档和您可能触发的不同事件:https://rxjs.dev/guide/observable