Scala Monix:从观察者收集数据,foldLeft

epfja78i  于 2022-12-29  发布在  Scala
关注(0)|答案(1)|浏览(113)

通过使用Observer,我尝试构建代码,它:
1.生成一些(随机)值;
2.组合这些值;
3.如果某个人的组合值超过了阈值,那么这个值必须被传递给另一个处理程序,所以我希望得到的值能够被返回以供进一步使用
我的代码:

//generate
 val o: Observable[Int] = Observable.repeatEval(Random.nextInt(10))

 //handle 
 val f = o.foldLeft(0) { (acc, el) =>
if (acc < 15) {
  el + acc
} else {
  println("handled " + acc)
  acc
 }
}
//use handled
.flatMap{res =>
          println("mapped " + res + 1)
          Observable(res + 1)
}

但没有传递给flatMap方法。输出如下所示:

0
3
7
11
12
handled 20

我哪里做错了?

8i9zcol2

8i9zcol21#

您希望使用mapAccumulate + collect

def groupWhile[A, B](o: Observable[A], s: B)(op: (B, A) => B)(predicate: B => Boolean): Observable[B] =
  o.mapAccumulate(seed = s) {
    case (b, a) =>
      val bb = op(b, a)
      if (predicate(bb)) (bb, None) else (s, Some(bb))
  } collect {
    case Some(b) => b
  }

像这样使用它:

// Generate.
val o: Observable[Int] = Observable.repeatEval(Random.nextInt(10))

// Handle.
val f = groupWhile(o, 0)((acc, i) => acc + i)(r => r <= 15)

// Use handled.
f.mapEval { x =>
 val res = x + 1
 Task(println("Mapped: ${res}")).as(res)
}

正如我常说的,Scaladoc是您的朋友。

相关问题