是否可以使用 Akka-stream 创建一个可以在 2 个不同内部形状之间切换的流?
Is it possible to create a Flow with Akka-stream that can switch between 2 different inner Shapes?
我想要一个复杂的 Flow
,我可以根据流入图表的数据在 2 个不同的 Shapes
之间切换。当我们 return a ClosedShape
图形是静态的但是当我们 return FlowShape
我想知道是否有可能在其中创建某种动态流。我在看 this question,他们似乎使用了一个 Partition
,我不知道如何应用,或者它是否真的解决了我的问题。
我从这个例子开始,但一直卡在代码中的注释中。
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import scala.concurrent.duration._
object StreamOpenGraphsWithMultipleFlows extends App {
run()
def run() = {
implicit val system = ActorSystem("StreamOpenGraphsWithMultipleFlows")
val fastSource = Source(1 to 1000).throttle(50, 1 second)
val slowSource = Source(1 to 1000).throttle(5, 1 second)
val INC = 5
val MULTI = 10
val DIVIDE = 2
val incrementer = Flow[Int].map { x =>
val result = x + INC
print(s" | incrementing $x + $INC -> $result")
result
}
val multiplier = Flow[Int].map { x =>
val result = x * MULTI
print(s" | multiplying $x * $MULTI -> $result")
result
}
val divider = Flow[Int].map { x =>
val result = x / DIVIDE
print(s" | dividing $x / $DIVIDE -> $result")
result
}
def isMultipleOf(value: Int, multiple: Int): Boolean = (value % multiple) == 0
// Step 1 - setting up the fundamental for a stream graph
val complexFlowIncrementer = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
// Step 2 - add necessary components of this graph
val incrementerShape = builder.add(incrementer)
val multiplierShape = builder.add(multiplier)
// println(s"builder.materializedValue: ${builder.materializedValue}")
// Step 3 - tying up the components
incrementerShape ~> multiplierShape
// BUT I WOULD LIKE TO DO SOMETHING AS BELOW
// if (isMultipleOf(value???, 10)) incrementerShape ~> divider
// else incrementerShape ~> multiplierShape
// Step 4 - return the shape
FlowShape(incrementerShape.in, multiplierShape.out)
}
)
// run the graph and materialize it
val graph = slowSource
.via(complexFlowIncrementer)
.to(Sink.foreach(x => println(s" | result: $x")))
graph.run()
}
}
此 blog post 展示了如何实现该目标的代码示例,因此在您的情况下,您需要遵循这些原则:
val complexFlowIncrementer = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
// Step 2 - add necessary components of this graph
val incrementerShape = builder.add(incrementer)
val multiplierShape = builder.add(multiplier)
val dividerShape = builder.add(divider)
//add partition and merge
val partition = builder.add(Partition[Int](2, if(isMultipleOf(_, 10)) 0 else 1)
val merge = builder.add(Merge[Int](2))
// println(s"builder.materializedValue: ${builder.materializedValue}")
// Step 3 - tying up the components
incrementerShape ~> partition
partition.out(0) ~> dividerShape ~> merge.in(0)
partition.out(1) ~> multiplierShape ~> merge.in(1)
// Step 4 - return the shape
FlowShape(incrementerShape.in, merge.out)
}
)
我想要一个复杂的 Flow
,我可以根据流入图表的数据在 2 个不同的 Shapes
之间切换。当我们 return a ClosedShape
图形是静态的但是当我们 return FlowShape
我想知道是否有可能在其中创建某种动态流。我在看 this question,他们似乎使用了一个 Partition
,我不知道如何应用,或者它是否真的解决了我的问题。
我从这个例子开始,但一直卡在代码中的注释中。
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import scala.concurrent.duration._
object StreamOpenGraphsWithMultipleFlows extends App {
run()
def run() = {
implicit val system = ActorSystem("StreamOpenGraphsWithMultipleFlows")
val fastSource = Source(1 to 1000).throttle(50, 1 second)
val slowSource = Source(1 to 1000).throttle(5, 1 second)
val INC = 5
val MULTI = 10
val DIVIDE = 2
val incrementer = Flow[Int].map { x =>
val result = x + INC
print(s" | incrementing $x + $INC -> $result")
result
}
val multiplier = Flow[Int].map { x =>
val result = x * MULTI
print(s" | multiplying $x * $MULTI -> $result")
result
}
val divider = Flow[Int].map { x =>
val result = x / DIVIDE
print(s" | dividing $x / $DIVIDE -> $result")
result
}
def isMultipleOf(value: Int, multiple: Int): Boolean = (value % multiple) == 0
// Step 1 - setting up the fundamental for a stream graph
val complexFlowIncrementer = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
// Step 2 - add necessary components of this graph
val incrementerShape = builder.add(incrementer)
val multiplierShape = builder.add(multiplier)
// println(s"builder.materializedValue: ${builder.materializedValue}")
// Step 3 - tying up the components
incrementerShape ~> multiplierShape
// BUT I WOULD LIKE TO DO SOMETHING AS BELOW
// if (isMultipleOf(value???, 10)) incrementerShape ~> divider
// else incrementerShape ~> multiplierShape
// Step 4 - return the shape
FlowShape(incrementerShape.in, multiplierShape.out)
}
)
// run the graph and materialize it
val graph = slowSource
.via(complexFlowIncrementer)
.to(Sink.foreach(x => println(s" | result: $x")))
graph.run()
}
}
此 blog post 展示了如何实现该目标的代码示例,因此在您的情况下,您需要遵循这些原则:
val complexFlowIncrementer = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
// Step 2 - add necessary components of this graph
val incrementerShape = builder.add(incrementer)
val multiplierShape = builder.add(multiplier)
val dividerShape = builder.add(divider)
//add partition and merge
val partition = builder.add(Partition[Int](2, if(isMultipleOf(_, 10)) 0 else 1)
val merge = builder.add(Merge[Int](2))
// println(s"builder.materializedValue: ${builder.materializedValue}")
// Step 3 - tying up the components
incrementerShape ~> partition
partition.out(0) ~> dividerShape ~> merge.in(0)
partition.out(1) ~> multiplierShape ~> merge.in(1)
// Step 4 - return the shape
FlowShape(incrementerShape.in, merge.out)
}
)