NodeJS 如何在rxjs上每1秒产生价值

mznpcxlj  于 2023-06-22  发布在  Node.js
关注(0)|答案(3)|浏览(116)

如果我已经有了一个observable,那么我应该使用什么操作符来使这个observable产生值,比如,每1秒?

// this just an example, In my project, I can't control when the 
// observable will produce value. I can assume that it will produce
// value very fast.
const obs = from([1,2,3,4,5]);

OBS会发出值1,2,3…非常快。但是如果我想让它每1秒发出一次值呢?我的意思是,只要确保obs不会太快地产生价值?
我检查了reactivex的文档,但找不到操作员。例如,delay,它只是使价值生产延迟一段时间,但值之间的相对时间间隔被保留,而debounceTime确实周期性地产生值,但忽略该时间窗口的值。
有人能告诉我如何使可观察到的值在一段时间内产生值,而不会错过或忽略值吗?

kx1ctssn

kx1ctssn1#

你可以用一个intervalobservable来zip它,像这样:

import { zip, from, interval } from 'rxjs'

const obs = zip(
  from([1,2,3,4,5]),
  interval(1000),
  (val, i) => val // Just emit the value
)

obs.subscribe(val => console.log(val))

如果你希望第一个值立即发出,那么你可以使用timer代替interval

import { zip, from, timer } from 'rxjs'

const obs = zip(
  from([1,2,3,4,5]),
  timer(0, 1000),
  (val, i) => val // Just emit the value
)

obs.subscribe(val => console.log(val))

如果你喜欢,你也可以使用管道,像这样:

import { from, interval } from 'rxjs'
import { zip } from 'rxjs/operators'

const obs = from([1,2,3,4,5])
  .pipe(
    zip(interval(1000), val => val)
  )

obs.subscribe(val => console.log(val))

更新

zip运算符已替换为zipWith,它没有resultSelector参数。zip将在v8中删除。
因此,上面使用zip运算符的示例可以更新如下:

import { from, interval } from 'rxjs'
import { map, zipWith } from 'rxjs/operators'

const obs = from([1,2,3,4,5])
  .pipe(
    zipWith(interval(1000)),
    map(val => val[0])
  )

obs.subscribe(val => console.log(val))
tktrz96b

tktrz96b2#

你真的可以使用delay,但是你必须把每个延迟的值变成一个Observable,然后用concatMap把它们连接成序列。

from([1,2,3,4,5]).pipe(
  concatMap(v => of(v).pipe(delay(1000))),
)

您可以使用zip,但它只适用于测试数据。zip仅在所有源Observable发出相同数量的项目时才会发出。这意味着如果一个Observable发射得非常快,然后慢下来,然后第二个Observable开始发射得很快,它会比1s延迟发射得更快。即使当intervalzip合并时也会发生这种情况,因为zip在内部缓冲所有值,因此它可能会堆叠来自interval的多个发射,然后如果第二个Observable发射得非常快,则会立即重新发射所有这些发射。

ttcibm8c

ttcibm8c3#

你可以试试这种东西

var source = Rx.Observable
  .range(1, 10)
  .concatMap(function (x) {
    return Rx.Observable
      .of(x)
      .delay(1000);
  })
 .timeInterval();

相关问题