如果我解压缩一系列元组,在两个流上执行一些异步变异,然后重新压缩它们,Akka能保证流以相同的顺序重新压缩吗?
示例:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Unzip, Zip}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
val graph: Flow[(Int, String), (Int, String), NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> zip.in0
unzip.out1 ~> append ~> zip.in1
FlowShape(unzip.in, zip.out)
})
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val out = Source(collection.immutable.Seq((0, "a"), (1, "b"), (2, "c")))
.via(graph)
.runWith(Sink.seq)
Await.result(out, 1 second)
在这个简单的测试中,输出是Vector((1,a-x), (2,b-x), (3,c-x))
。所以情况看起来很好。但是我不确定我是否可以相信情况总是如此。
引起一点关注的是:
val unzip = builder.add(Unzip[Int, String])
val increment = builder.add(Flow[Int].mapAsync(3) { num => Future(num + 1) })
val filter = builder.add(Flow[Int].filter(_ != 2))
val append = builder.add(Flow[String].mapAsync(3) { letter => Future(s"$letter-x") })
val zip = builder.add(Zip[Int, String])
unzip.out0 ~> increment ~> filter ~> zip.in0
unzip.out1 ~> append ~> zip.in1
// output: Vector((1,a-x), (3,b-x))
即使保留了顺序,也不能保证原始元组关系会被保留。
我可以手动检查我的流,以确保没有过滤逻辑。但这样做,我能确保元组将被重新压缩,以精确的顺序,他们收到?
1条答案
按热度按时间sshcrbum1#
TL;DR是的,确实如此。来自Akka的流订购文档:
在Akka Streams中,几乎所有的运算符都保持元素的输入顺序,这意味着如果输入
{IA1,IA2,...,IAn}
“导致”输出{OA1,OA2,...,OAk}
,输入{IB1,IB2,...,IBm}
“导致”输出{OB1,OB2,...,OBl}
,并且所有IAi
都发生在所有IBi
之前,那么OAi
发生在OBi
之前。此属性甚至由
async
操作(例如mapAsync
)支持,然而存在称为mapAsyncUnordered
的未排序版本,其不保留此排序。然而,在处理多个输入流的Junctions的情况下(例如
Merge
),通常不为到达不同输入端口的元素定义输出顺序。也就是说,类似合并的操作可以在发出Bi
之前发出Ai
,并且由其内部逻辑来决定发出的元素的顺序。**然而,诸如Zip
之类的专用元素确实保证了它们的输出顺序,因为每个输出元素依赖于已经被信号通知的所有上游元素-因此在压缩的情况下的排序由该属性定义。如果您发现自己需要对扇入场景中发出的元素的顺序进行细粒度控制,请考虑使用
MergePreferred
、MergePrioritized
或GraphStage
-这将使您能够完全控制合并的执行方式。