我试图连接两个流,但无法解释我的实现的输出。
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)
val flowGraph = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
broadcast ~> flow1 ~> concat.in(0)
broadcast ~> flow2 ~> concat.in(1)
FlowShape(broadcast.in, concat.out)
}
)
source.via(flowGraph).runWith(sink)
我希望从这段代码中得到以下输出。
2
3
4
.
.
.
11
10
20
.
.
.
100
相反,我只看到“2”被打印出来。你能解释一下我的实现有什么问题吗?我应该如何改变程序以获得所需的输出。
2条答案
按热度按时间7vux5j2d1#
来自Akka Stream的API文档:
Concat
:在当前流具有可用元素时发出;如果当前输入完成,则尝试下一个输入
Broadcast
:当所有输出停止背压且有输入元件可用时发射
这两个运算符不能一起工作,因为它们的工作方式存在冲突--
Concat
试图在切换到另一个输出之前从Broadcast
的一个输出中提取所有元素,而Broadcast
不会发出,除非对它的所有输出都有需求。根据您的需要,您可以按照评论者的建议使用
concat
进行连接:或者等效地使用如下
Source.combine
:dced5bon2#
使用
GraphDSL
,它是合并实现的简化版本: