typescript RXJS向现有可观察项添加可观察项

dbf7pr2w  于 2023-02-20  发布在  TypeScript
关注(0)|答案(2)|浏览(211)

考虑下面的类的DUMMY示例,它只包含一个主题列表和一个listen方法。

class X {
  subjects: Subject<any>[] = [new Subject<any>()];
  
  listen(): Observable<any> {
    return merge(...this.subjects).pipe();
  }
}

class Y {
   constructor() {
     new X().listen().subscribe(value => console.log('received', value));
   }
}

如果,现在,我们在名单上加一个新的主题。

class X {
  ...
  addSubject(subjectToAdd: Subject<any>): void {
    this.subjects.push(subjectToAdd);
  }
  ...
}

如果在主题subjectToAdd上发出事件,则Y类将从不记录该事件,因为它开始监听旧列表。Y不应该知道X的内部状态,它说它想要监听并且应该接收在驻留在X中的任何主题上执行的任何事件,无论该事件是在订阅时间还是稍后添加的。
注意,这只是一个虚拟的例子,我只是想知道是否有一种方法仍然可以调用listen方法,该方法可以向现有流添加可观察对象。

zmeyuzjn

zmeyuzjn1#

这里有一个愚蠢的想法,虽然它的工作(那么它是 * 真的 * 愚蠢?):创建一个可观测量流并将其展平两次。

class X {
  subjects: Subject<any>[] = [new Subject<any>()];

  observables$: Subject<Observable<any>> = new Subject();

  listen(): Observable<any> {
    const arrayOfSubjects = this.observables$.pipe(
      scan((acc, curr) => [...acc, curr], []),
      mergeMap((val) => val),
      mergeMap((val) => val) // not a typo
    );

    return merge(arrayOfSubjects);
  }

  addStream(stream: Observable<any>): void {
    this.observables$.next(stream);
  }
}

const x = new X();

x.listen().subscribe(console.log);

x.addStream(of('Hello'));

https://stackblitz.com/edit/rxjs-q2h9vm?devtoolsheight=60&file=index.ts

ou6hu8tu

ou6hu8tu2#

我做过类似的事情,其中一个方法是使用另一个Subject,即“add”操作,这个add操作可以从现有的Subject列表中创建一个新的列表。
这不是您的确切场景,但您可以根据自己的情况进行调整:

// Action Stream for adding/updating/deleting products
  private productModifiedSubject = new Subject<Action<Product>>();
  productModifiedAction$ = this.productModifiedSubject.asObservable();

上面我创建了一个Subject,它在创建、更新或删除操作时发出。(听起来好像你只有add?)
然后,它创建了一系列新的产品(你的产品将创建一系列新的主题)。

allProducts$ = merge(
    this.retrievedProducts$,
    this.productModifiedAction$
    ).pipe(
      scan((acc, value) =>
        (value instanceof Array) ? [...value] : this.modifyProducts(acc, value), [] as Product[]),
      shareReplay(1)
    );

此外,如果您将listen Observable声明为声明性的(而不是方法),那么它将在每次发出操作主题时发出。
如果你想在Stackblitz上做一些虚拟的东西,我可以调整我的示例代码,使之更接近你所寻找的。

相关问题