akka 是否有方法在每个其他流之后执行任何特定流,而无需显式插入它

xtfmy6hx  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(183)

我有多个流程(处理从队列接收的消息)要执行,在每个流程之后,我需要检查前一个流程中是否有任何错误,如果有,则我过滤掉正在处理的消息,否则继续下一个流程。
目前,我不得不将这个错误处理程序流显式地插在每个其他流之后。有没有什么方法可以通过一些功能来实现这一点,其中这个错误流可以被配置为在每个其他流之后运行。或者有没有其他更好的方法来实现这一点?
示例:
流程1 -〉验证消息,如果错误,则将消息标记为错误
错误流-〉检查消息是否被标记为错误,如果是则过滤,否则继续。
流程2 -〉将消息保存到数据库,在出错情况下进行标记。
错误流-〉检查消息是否被标记为错误,如果是,则过滤,否则继续
流3 -〉等等。
或者有没有方法可以 Package (流1 +错误流),(流2 -〉错误流)?

h43kikqp

h43kikqp1#

我不确定这是否正是您所要求的,但我有一个解决方案。可以做的是创建所有流,例如,我们可以查看:

val flows = Seq (
  Flow.fromFunction[Int, Int](x => { println(s"flow1: Received $x"); x * 2 }),
  Flow.fromFunction[Int, Int](x => { println(s"flow2: Received $x"); x + 1}),
  Flow.fromFunction[Int, Int](x => { println(s"flow3: Received $x"); x * x})
)

然后,我们需要将错误处理添加到每个现有流程中。因此,让我们定义它,并将其添加到每个元素中:

val errorHandling = Flow[Int].filter(_ % 2 == 0)
val errorsHandledFlows = flows.map(flow => flow.via(errorHandling))

现在,我们需要一个helper函数,它将连接我们所有的新流:

def connectFlows(errorsHandledFlows: Seq[Flow[Int, Int, _]]): Flow[Int, Int, _] = {
  errorsHandledFlows match {
    case Seq() => Flow[Int] // This line is actually redundant, But I don't want to have an unexhausted pattern matching
    case Seq(singleFlow) => singleFlow
    case head :: tail => head.via(connectFlows(tail))
  }
}

现在,我们需要一起执行,例如:

Source(1 to 4).via(connectFlows(errorsHandledFlows)).to(Sink.foreach(println)).run()

将提供输出:

flow1: Received 1
flow2: Received 2
flow1: Received 2
flow2: Received 4
flow1: Received 3
flow2: Received 6
flow1: Received 4
flow2: Received 8

正如你所看到的,我们过滤了奇数,因此第一个流得到了从1到4的所有数字,第二个流得到了2、4、6、8(第一个流将这些值乘以2),而最后一个流没有得到任何流,因为第二个流使所有的值都是奇数。

xzabzqsa

xzabzqsa2#

您也可以使用合并

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val merge = builder.add(Merge[Int](3))

  val flow1 = ...
  val flow2 = ...
  val flow3 = ...

  flow1 ~> merge
  flow2 ~> merge
  flow3 ~> merge

  ClosedShape
})

不确定它是否符合你的需要,只是显示替代方案。

相关问题