Akka Streams Unzip/Zip是否保持顺序?

s6fujrry  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(202)

如果我解压缩一系列元组,在两个流上执行一些异步变异,然后重新压缩它们,Akka能保证流以相同的顺序重新压缩吗?
示例:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val unzip = builder.add(Unzip[Int, String])
  val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
  val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
  val zip = builder.add(Zip[Int, String])

  unzip.out0 ~> increment ~> zip.in0
  unzip.out1 ~> append ~> zip.in1

  FlowShape(unzip.in, zip.out)
})

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
  .via(graph)
  .runWith(Sink.seq)

Await.result(out, 1 second)

在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))。所以情况看起来很好。但是我不确定我是否可以相信情况总是如此。
引起一点关注的是:

val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])

unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1

// output: Vector((1,a-x), (3,b-x))

即使保留了顺序,也不能保证原始元组关系会被保留。
我可以手动检查我的流,以确保没有过滤逻辑。但这样做,我能确保元组将被重新压缩,以精确的顺序,他们收到?

sshcrbum

sshcrbum1#

TL;DR是的,确实如此。来自Akka的流订购文档:
在Akka Streams中,几乎所有的运算符都保持元素的输入顺序,这意味着如果输入{IA1,IA2,...,IAn}“导致”输出{OA1,OA2,...,OAk},输入{IB1,IB2,...,IBm}“导致”输出{OB1,OB2,...,OBl},并且所有IAi都发生在所有IBi之前,那么OAi发生在OBi之前。
此属性甚至由async操作(例如mapAsync)支持,然而存在称为mapAsyncUnordered的未排序版本,其不保留此排序。
然而,在处理多个输入流的Junctions的情况下(例如Merge),通常不为到达不同输入端口的元素定义输出顺序。也就是说,类似合并的操作可以在发出Bi之前发出Ai,并且由其内部逻辑来决定发出的元素的顺序。**然而,诸如Zip之类的专用元素确实保证了它们的输出顺序,因为每个输出元素依赖于已经被信号通知的所有上游元素-因此在压缩的情况下的排序由该属性定义。
如果您发现自己需要对扇入场景中发出的元素的顺序进行细粒度控制,请考虑使用MergePreferredMergePrioritizedGraphStage-这将使您能够完全控制合并的执行方式。

相关问题