Akka Streams 入口和出口匹配
Akka Streams Inlets and Outlets match
这是我能想到的使用 Partition
和 Merge
的最简单图表,但是当 运行 时,它会出现以下错误:
requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0, Partition.out1]
我知道该消息表明我的输出多于输入或未连接的流,但我似乎无法在这个简单的示例中看到不匹配的地方。
感谢任何帮助。
图表:
def createGraph()(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
import GraphDSL.Implicits._
val inputs: List[Int] = List(1, 2, 3, 4)
val source: Source[Int, NotUsed] = Source(inputs)
val messageSplit: UniformFanOutShape[Int, Int] = builder.add(Partition[Int](2, i => i%2))
val messageMerge: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val processEven: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => {
actorSystem.log.debug(s"even: $rc")
rc
})
val processOdd: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => {
actorSystem.log.debug(s"odd: $rc")
rc
})
source ~> messageSplit.in
messageSplit.out(0) -> processEven -> messageMerge.in(0)
messageSplit.out(1) -> processOdd -> messageMerge.in(1)
messageMerge.out ~> s
ClosedShape
}
}
测试:
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import akka.{Done, NotUsed}
import org.scalatest.FunSpec
import scala.concurrent.Future
class RoomITSpec extends FunSpec {
implicit val actorSystem: ActorSystem = ActorSystem("RoomITSpec")
implicit val actorCreator: ActorMaterializer = ActorMaterializer()
describe("graph") {
it("should run") {
val graph = createGraph()
RunnableGraph.fromGraph(graph).run
}
}
}
小语法错误。
// Notice the curly arrows
messageSplit.out(0) ~> processEven ~> messageMerge.in(0)
messageSplit.out(1) ~> processOdd ~> messageMerge.in(1)
而不是你写的:
// Straight arrows
messageSplit.out(0) -> processEven -> messageMerge.in(0)
messageSplit.out(1) -> processOdd -> messageMerge.in(1)
您最终生成(并丢弃)了元组而不是添加到图形中。
这是我能想到的使用 Partition
和 Merge
的最简单图表,但是当 运行 时,它会出现以下错误:
requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0, Partition.out1]
我知道该消息表明我的输出多于输入或未连接的流,但我似乎无法在这个简单的示例中看到不匹配的地方。
感谢任何帮助。
图表:
def createGraph()(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
import GraphDSL.Implicits._
val inputs: List[Int] = List(1, 2, 3, 4)
val source: Source[Int, NotUsed] = Source(inputs)
val messageSplit: UniformFanOutShape[Int, Int] = builder.add(Partition[Int](2, i => i%2))
val messageMerge: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val processEven: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => {
actorSystem.log.debug(s"even: $rc")
rc
})
val processOdd: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => {
actorSystem.log.debug(s"odd: $rc")
rc
})
source ~> messageSplit.in
messageSplit.out(0) -> processEven -> messageMerge.in(0)
messageSplit.out(1) -> processOdd -> messageMerge.in(1)
messageMerge.out ~> s
ClosedShape
}
}
测试:
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import akka.{Done, NotUsed}
import org.scalatest.FunSpec
import scala.concurrent.Future
class RoomITSpec extends FunSpec {
implicit val actorSystem: ActorSystem = ActorSystem("RoomITSpec")
implicit val actorCreator: ActorMaterializer = ActorMaterializer()
describe("graph") {
it("should run") {
val graph = createGraph()
RunnableGraph.fromGraph(graph).run
}
}
}
小语法错误。
// Notice the curly arrows
messageSplit.out(0) ~> processEven ~> messageMerge.in(0)
messageSplit.out(1) ~> processOdd ~> messageMerge.in(1)
而不是你写的:
// Straight arrows
messageSplit.out(0) -> processEven -> messageMerge.in(0)
messageSplit.out(1) -> processOdd -> messageMerge.in(1)
您最终生成(并丢弃)了元组而不是添加到图形中。