我有一个连接几个FlowShapes的Flow,它看起来像这样:
def mainFlow: Flow[MyGraphElement, MyGraphElement, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val someClassifier = builder.add(checkSomething) // FlowShape[MyGraphElement,MyGraphElement]
val filteringRouter = builder.add(partitionBySomething) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val mlRouter = builder.add(partitionBySomethinfElse()) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val publishToSnsFlow = builder.add(publishEvidenceToSns()) // FlowShape[MyGraphElement,MyGraphElement]
val updateTaskStatusDoneFlow1 = builder.add(updateTaskStatus()) // FlowShape[MyGraphElement,MyGraphElement]
val updateTaskStatusDoneFlow2 = builder.add(updateTaskStatus())
val updateTaskStatusDoneFlow3 = builder.add(updateTaskStatus())
someClassifier ~> filteringRouter
filteringRouter.out("case1") ~> publishToSnsFlow ~> updateTaskStatusDoneFlow1 ~> merge
filteringRouter.out("case2") ~> someDeciderFlow ~> mlRouter
mlRouter.out("case5") ~> doSomethingFlow ~> updateTaskStatusDoneFlow2 ~> merge
mlRouter.out("case4") ~> doSomethingElseFlow ~> updateTaskStatusDoneFlow3 ~> merge
FlowShape(someClassifier.in, merge.out)
})
我的问题是,我需要用不同的名称调用同一个方法3次,因为FlowShap只能在一个Flow中使用一次......或者我遗漏了一些东西,我能以某种方式做一些改变,使它看起来更优雅吗?我指的是updateTaskStatusDoneFlow 1/2/3
谢谢!
1条答案
按热度按时间m1m5dgzv1#
不如先合并,再按流程运行?