如何在流 Akka Streams 中重用相同的 flowShape

How to reuse the same flowShape inside a flow Akka Streams

我有一个连接几个 FlowShape 的 Flow,它看起来像这样:

def mainFlow: Flow[MyGraphElement, MyGraphElement, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>

    val someClassifier = builder.add(checkSomething) // FlowShape[MyGraphElement,MyGraphElement]
    val filteringRouter = builder.add(partitionBySomething) // UniformFanOutShape[MyGraphElement,MyGraphElement]
    val mlRouter = builder.add(partitionBySomethinfElse()) // UniformFanOutShape[MyGraphElement,MyGraphElement]
    val publishToSnsFlow = builder.add(publishEvidenceToSns()) // FlowShape[MyGraphElement,MyGraphElement]

    val updateTaskStatusDoneFlow1 = builder.add(updateTaskStatus()) // FlowShape[MyGraphElement,MyGraphElement]
    val updateTaskStatusDoneFlow2 = builder.add(updateTaskStatus())
    val updateTaskStatusDoneFlow3 = builder.add(updateTaskStatus())

    someClassifier ~> filteringRouter
    filteringRouter.out("case1") ~> publishToSnsFlow ~> updateTaskStatusDoneFlow1 ~> merge
    filteringRouter.out("case2") ~> someDeciderFlow ~> mlRouter

    mlRouter.out("case5") ~> doSomethingFlow ~> updateTaskStatusDoneFlow2 ~> merge
    mlRouter.out("case4") ~> doSomethingElseFlow ~> updateTaskStatusDoneFlow3 ~> merge

    FlowShape(someClassifier.in, merge.out)
})

我的问题是我需要用不同的名称调用同一个方法 3 次,因为 FlowShap 只能在 Flow 中使用一次......或者我遗漏了一些东西,我能以不同的方式做到这一点以使其看起来更优雅?我指的是 updateTaskStatusDoneFlow1/2/3

谢谢!

先合并,然后运行通过流程如何?

val someClassifier = builder.add(checkSomething) // FlowShape[MyGraphElement,MyGraphElement]
val filteringRouter = builder.add(partitionBySomething) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val mlRouter = builder.add(partitionBySomethinfElse()) // UniformFanOutShape[MyGraphElement,MyGraphElement]
val publishToSnsFlow = builder.add(publishEvidenceToSns()) // FlowShape[MyGraphElement,MyGraphElement]

val updateTaskStatusDoneFlow = builder.add(updateTaskStatus()) // FlowShape[MyGraphElement,MyGraphElement]

someClassifier ~> filteringRouter
filteringRouter.out("case1") ~> publishToSnsFlow ~> merge
filteringRouter.out("case2") ~> someDeciderFlow ~> mlRouter

mlRouter.out("case5") ~> doSomethingFlow ~> merge
mlRouter.out("case4") ~> doSomethingElseFlow ~> merge

merge.out ~> updateTaskStatusDoneFlow

FlowShape(someClassifier.in, updateTaskStatusDoneFlow.out)