如何处理java.util.NoSuchElementException:减少空流akka流?

cvxl0en2  于 2022-11-05  发布在  Java
关注(0)|答案(2)|浏览(196)

我有一个过滤器来过滤掉流中不正确的项。在某些边缘情况下,这可能会导致所有项都被过滤掉。当发生这种情况时,运行reduce时流会失败,错误为-java.util.NoSuchElementException: reduce over empty stream
如何处理这种情况以返回有意义的响应?
我试过像这样的监督-

val decider: Supervision.Decider = {
    case _                      => Supervision.Stop
    case e : NoSuchElementException => Supervision.stop
  }

RunnableGraph.
toMat(Sink.reduce[Int](_ + _)
.withAttributes(ActorAttributes.supervisionStrategy(decider)))(Keep.both)
.run()

我也试过recover,但似乎都不起作用。
我需要处理此案例以返回有意义的响应。
任何帮助都将不胜感激。

kuuvgm7e

kuuvgm7e1#

可能值得考虑使用Sink.fold而不是Sink.reduce

val possiblyEmpty = Source(Seq(1, 3, 5)).filter(_ % 2 == 0)
val eventualInt = possiblyEmpty.toMat(Sink.fold[Int](0)(_ + _))(Keep.right).run()

如果没有合理的零/单位元素,你可以有一些更一般化的东西:

def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
  val lifted = { (x: Option[T], y: Option[T]) =>
    x.flatMap(a => y.map(f))
  }
  source.map(x => Some(x))
    .concat(Source.single(None))
    .statefulMapConcat[Option[T]] { () =>
      var emptyStream = true
      { x =>
        x match {
          case Some(x) =>
            // element from the given stream
            emptyStream = false
            List(x)

          case None =>
            // given stream completed
            if (emptyStream) {
              List(x)
            } else {
              Nil  // don't emit anything
            }
        }
      }
    }
    .toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}

如果没有元素,返回的图形将以None完成,或者以约简结果的Some完成。
编辑:您也可以只在Source/Flow上使用orElse,代替上面的.concat.statefulMapConcat

def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
  val lifted = { (x: Option[T], y: Option[T]) =>
    x.flatMap(a => y.map(f))
  }

  source.map(x => Some(x))
    .orElse(Source.single(None))
    .toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
ax6ht2ek

ax6ht2ek2#

因为你使用的是Sink.reduce[Int],所以你可以添加一个Source,它保证只有一个0元素,这样Sink.reduce[Int]就可以工作,并产生0
这里有一个例子

val zero          = Source.single(0)
val possiblyEmpty = Source(List[Int](1, 3, 5)).filter(_ % 2 == 0)
val eventualInt   = zero.merge(possiblyEmpty).toMat(Sink.reduce[Int](_ + _))(Keep.right).run()

相关问题