是否有更高级别的方法来编写自定义 GraphStage?
Is there a higher-level way to write a custom GraphStage?
我正在尝试了解 Akka Streams 的一些新内容。我有一个自定义的 FanOutShape2 形状,它做的事情非常简单:输入 (Boolean, Option[A_Thing]) 并决定是否将流路由到 out0 或 out1(通过或失败),如下所示:
object PassFilter {
type FilterShape = FanOutShape2[(Boolean, Option[OutputWrapper]), OutputWrapper, akka.NotUsed]
}
import PassFilter._
case class PassFilter()(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
val mergedIn: Inlet[(Boolean, Option[OutputWrapper])] = Inlet("Merged")
val outPass: Outlet[OutputWrapper] = Outlet("Pass")
val outFail: Outlet[akka.NotUsed] = Outlet("Fail")
override val shape: FilterShape = new FanOutShape2(mergedIn, outPass, outFail)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(mergedIn)
setHandler(mergedIn, new InHandler {
override def onPush(): Unit = {
val (passedPrivacy, outWrapper) = grab(mergedIn)
if (!passedPrivacy || outWrapper.isEmpty)
push(outFail, akka.NotUsed)
else
push(outPass, outWrapper.get)
pull(mergedIn)
}
override def onUpstreamFinish(): Unit = {} // necessary for some reason!
})
setHandler(outPass, eagerTerminateOutput)
setHandler(outFail, eagerTerminateOutput)
}
}
这个基本想法可行,我可以看到它让我有可能完全控制流程,但是对于这个超级琐碎的决策逻辑,有没有更简单、更高级的方法来创建 "widget" 可以包含在我的 GraphDSL 中吗?
此答案基于 akka-stream
版本 2.4.2-RC1
。 API 在其他版本中可能略有不同。依赖可以被 sbt:
消费
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-RC1"
您可以轻松创建自己的逻辑抽象。您可以将其添加为类型参数,而不是对 PassFilter
class 的类型进行硬编码。而不是对确定输出端口的函数进行硬编码,您可以将其作为构造函数参数传递。通过这样做,您将收到一个可以连接到任意流的可重用组件。幸运的是,Akka 已经提供了这样的组件。它被称为 Partition
:
val shape = GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
val first = b.add(Sink.foreach[Int](elem ⇒ println("even:\t" + elem)))
val second = b.add(Sink.foreach[Int](elem ⇒ println("odd:\t" + elem)))
val p = b.add(Partition[Int](2, elem ⇒ if (elem%2 == 0) 0 else 1))
p ~> first
p ~> second
SinkShape(p.in)
}
Source(1 to 5).to(shape).run()
/*
This should print:
odd: 1
even: 2
odd: 3
even: 4
odd: 5
*/
Partition
组件还采用多个输出端口作为参数,使其更具可重用性。使用 ~>
符号,您可以将输出端口连接到其他组件,我在示例中就是这样做的。您当然可以忽略它,并且 return 两个输出端口都通过 FanOutShape2
组件。
我正在尝试了解 Akka Streams 的一些新内容。我有一个自定义的 FanOutShape2 形状,它做的事情非常简单:输入 (Boolean, Option[A_Thing]) 并决定是否将流路由到 out0 或 out1(通过或失败),如下所示:
object PassFilter {
type FilterShape = FanOutShape2[(Boolean, Option[OutputWrapper]), OutputWrapper, akka.NotUsed]
}
import PassFilter._
case class PassFilter()(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
val mergedIn: Inlet[(Boolean, Option[OutputWrapper])] = Inlet("Merged")
val outPass: Outlet[OutputWrapper] = Outlet("Pass")
val outFail: Outlet[akka.NotUsed] = Outlet("Fail")
override val shape: FilterShape = new FanOutShape2(mergedIn, outPass, outFail)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(mergedIn)
setHandler(mergedIn, new InHandler {
override def onPush(): Unit = {
val (passedPrivacy, outWrapper) = grab(mergedIn)
if (!passedPrivacy || outWrapper.isEmpty)
push(outFail, akka.NotUsed)
else
push(outPass, outWrapper.get)
pull(mergedIn)
}
override def onUpstreamFinish(): Unit = {} // necessary for some reason!
})
setHandler(outPass, eagerTerminateOutput)
setHandler(outFail, eagerTerminateOutput)
}
}
这个基本想法可行,我可以看到它让我有可能完全控制流程,但是对于这个超级琐碎的决策逻辑,有没有更简单、更高级的方法来创建 "widget" 可以包含在我的 GraphDSL 中吗?
此答案基于 akka-stream
版本 2.4.2-RC1
。 API 在其他版本中可能略有不同。依赖可以被 sbt:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-RC1"
您可以轻松创建自己的逻辑抽象。您可以将其添加为类型参数,而不是对 PassFilter
class 的类型进行硬编码。而不是对确定输出端口的函数进行硬编码,您可以将其作为构造函数参数传递。通过这样做,您将收到一个可以连接到任意流的可重用组件。幸运的是,Akka 已经提供了这样的组件。它被称为 Partition
:
val shape = GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
val first = b.add(Sink.foreach[Int](elem ⇒ println("even:\t" + elem)))
val second = b.add(Sink.foreach[Int](elem ⇒ println("odd:\t" + elem)))
val p = b.add(Partition[Int](2, elem ⇒ if (elem%2 == 0) 0 else 1))
p ~> first
p ~> second
SinkShape(p.in)
}
Source(1 to 5).to(shape).run()
/*
This should print:
odd: 1
even: 2
odd: 3
even: 4
odd: 5
*/
Partition
组件还采用多个输出端口作为参数,使其更具可重用性。使用 ~>
符号,您可以将输出端口连接到其他组件,我在示例中就是这样做的。您当然可以忽略它,并且 return 两个输出端口都通过 FanOutShape2
组件。