typescript 可观察forkJoin未触发

093gszye  于 2023-04-22  发布在  TypeScript
关注(0)|答案(7)|浏览(185)

我尝试在两个Observable上使用forkJoin。其中一个以流的形式开始...如果我直接订阅它们,我会得到一个响应,但forkJoin没有触发。有什么想法吗?

private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();    

....

this.data$ = this.queryStream
    .startWith('')
     .flatMap(queryInput => {
            this.query = queryInput
            return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
                })
            .share();
    
...

Observable.forkJoin(this.statuses$, this.companies$)
            .subscribe(res => {
                console.log('forkjoin');
                this._countStatus(res[0], res[1]);
            });

// This shows arrays in the console...

this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));

// In the console
Array[9]
Array[6]
piwo6bdm

piwo6bdm1#

forkJoin仅在所有内部观测值完成时发射。如果您需要forkJoin的等价物,仅侦听来自每个源的单个发射,请使用combineLatest + take(1)

combineLatest(
  this.statuses$,
  this.companies$,
)
.pipe(
  take(1),
)
.subscribe(([statuses, companies]) => {
  console.log('forkjoin');
  this._countStatus(statuses, companies);
});

一旦两个源都发出,combineLatest将发出,take(1)将立即取消订阅。

ikfrs5lh

ikfrs5lh2#

forkJoin的一个非常常见的问题是,它要求所有源Observable至少发出一个项目,并且所有项目都必须完成。
换句话说,如果this.statuses$this.companies$不发出任何项,并且直到它们都完成,forkJoin才发出任何项。

this.statuses$.subscribe(
    res => console.log(res),
    undefined,
    () => console.log('completed'),
);
qrjkbowd

qrjkbowd3#

forkJoin不工作,所以我使用下面的代码来解决我的问题。使用mergeMap,您可以将外部订阅的结果Map到内部订阅,并根据需要订阅它。

this.statuses$.pipe(
    mergeMap(source => this.companies$.pipe(
        map(inner => [source , inner])
        )
    )
).subscribe(([e , r]) => {
    console.log(e , r);
})
kxkpmulp

kxkpmulp4#

对我来说,combineLatest运算符就是解决方案!

cyej8jka

cyej8jka5#

.pipe(take(1))作为类似于asObservable()的可观察对象的管道追加即可完成这项工作。

forkJoin({
    l0: this._svc.data$.pipe(take(1)),
    l1: this._api.getLogman1(),
    l2: this._api.getLogman2(),
    l3: this._api.getLogman3(),
})
    .pipe(
        takeUntil(this._unsubscribeAll),
    )
    .subscribe(x => {
        console.log(x);
    });
bvuwiixz

bvuwiixz6#

Observable.forkJoin([
      _someService.getUsers(),
      _someService.getCustomers(),
    ])
      .subscribe((data: [Array<User>, Array<Customer>]) => {
        let users: Array<User> = data[0];
        let customer: Array<Customer> = data[1];
      }, err => {
      });



      //someService
        getUsers():Observable<User> {
          let url = '/users';
          return this._http.get(url, headers)
            .map(res => res.json());
        }

        getCustomers():Observable<Customer> {
          let url = '/customers';
          return this._http.get(url, headers)
            .map(res => res.json());
        }
wfsdck30

wfsdck307#

正如在其他回复中提到的,forkJoin似乎不起作用,因为它没有与subscribe相同的功能。
Subscribe会在接收到next或completed事件时触发,但forkJoin只有在所有observable都completed时才会触发。
与combineLatest的区别在于,如果你的Observable发送所有next或completed事件,它将被触发,而forkJoin只会在你的Observable发送所有completed事件时触发。
因此,如果事件发送next事件,则代码将不起作用,但以下代码应该起作用:

combineLatest([this.statuses$, this.companies$])
        .subscribe(res => {
            console.log('combineLatest');
            this._countStatus(res[0], res[1]);
        });

你可以在rxjs的文档中看到,在你的例子中,combineLastest是最好的答案:https://www.learnrxjs.io/learn-rxjs/operators/combination/forkjoin
要合并的单据最新https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest
可观察文档和您可能触发的不同事件:https://rxjs.dev/guide/observable

相关问题