Akka 流选项输出
Akka Stream Option output
我创建了一个 Akka Stream,它有一个简单的 Source
、Flow
和 Sink
。有了这个,我可以轻松地通过它发送元素。现在我想更改此流,以便 Flow
returns 成为 Option
。根据 Option
的结果,我想更改 Flow
.
的输出
是否可以创建这样的结构?
您可以将具有 2 个接收器的流视为本身是一个接收器。要构造更复杂的图形,我们可以使用 GraphDSL.
中提供的函数
在一般情况下考虑
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
//Here we broadcast the Some[T] values to 2 flows,
// each filtering to the correct type for each sink
val bcast = builder.add(Broadcast[Option[T]](2))
bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
bcast.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
//The flow that maps T => Some[T]
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> bcast.in
//The whole thing is a Sink[T]
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
This returns a Sink[T,Mat]
使用提供的函数,将传入的 T
元素映射到 Option[T]
,然后指向其中一个提供水槽。
用法示例:
val sink = splittingSink(
(s: String) ⇒ if (s.length % 2 == 0) Some(s) else None,
Sink.foreach[String](s),
Sink.foreach[None.type](_ ⇒ println("None")),
(f1: Future[_], f2: Future[_]) ⇒ Future.sequence(Seq(f1, f2)).map(_ ⇒ Done)
)
Source(List("One", "Two", "Three", "Four", "Five", "Six"))
.runWith(sink)
.onComplete(_ ⇒ println("----\nDone"))
输出:
None
None
None
Four
Five
None
----
Done
有关 Stream Graphs 的文档部分进一步讨论了 GraphDSL 的使用。
假设你有类似的东西
val source = Source(1 to 100)
val flow = Flow[Int].map {
case x if x % 2 == 0 ⇒ Some(x.toString)
case _ ⇒ None
}
val sink1 = Sink.foreach[String](println)
val sink2 = Sink.foreach[None.type](x ⇒ println("dropped element"))
您可以制作具有所需结构的可运行图,如下所示:
val runnable = source
.via(flow)
.alsoTo(Flow[Option[String]].collect { case None ⇒ None }.to(sink2))
.to(Flow[Option[String]].collect { case Some(x) ⇒ x }.to(sink1))
此时给出的两个答案都涉及Broadcast
。请注意,它可能适用于这个特定示例,但在更复杂的图形中,Broadcast
可能不是一个明智的选择。
原因是如果至少有一个下游背压,Broadcast
总是背压。
最好的背压感知解决方案是 Partition
,它能够从 Partitioner 函数选择的分支选择性地传播背压。
下面的示例(详细说明了 T-Fowl 的回答之一)
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
val partition = builder.add(Partition[Option[T]](2, partitioner))
partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> partition.in
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
我创建了一个 Akka Stream,它有一个简单的 Source
、Flow
和 Sink
。有了这个,我可以轻松地通过它发送元素。现在我想更改此流,以便 Flow
returns 成为 Option
。根据 Option
的结果,我想更改 Flow
.
是否可以创建这样的结构?
您可以将具有 2 个接收器的流视为本身是一个接收器。要构造更复杂的图形,我们可以使用 GraphDSL.
中提供的函数在一般情况下考虑
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
//Here we broadcast the Some[T] values to 2 flows,
// each filtering to the correct type for each sink
val bcast = builder.add(Broadcast[Option[T]](2))
bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
bcast.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
//The flow that maps T => Some[T]
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> bcast.in
//The whole thing is a Sink[T]
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
This returns a Sink[T,Mat]
使用提供的函数,将传入的 T
元素映射到 Option[T]
,然后指向其中一个提供水槽。
用法示例:
val sink = splittingSink(
(s: String) ⇒ if (s.length % 2 == 0) Some(s) else None,
Sink.foreach[String](s),
Sink.foreach[None.type](_ ⇒ println("None")),
(f1: Future[_], f2: Future[_]) ⇒ Future.sequence(Seq(f1, f2)).map(_ ⇒ Done)
)
Source(List("One", "Two", "Three", "Four", "Five", "Six"))
.runWith(sink)
.onComplete(_ ⇒ println("----\nDone"))
输出:
None
None
None
Four
Five
None
----
Done
有关 Stream Graphs 的文档部分进一步讨论了 GraphDSL 的使用。
假设你有类似的东西
val source = Source(1 to 100)
val flow = Flow[Int].map {
case x if x % 2 == 0 ⇒ Some(x.toString)
case _ ⇒ None
}
val sink1 = Sink.foreach[String](println)
val sink2 = Sink.foreach[None.type](x ⇒ println("dropped element"))
您可以制作具有所需结构的可运行图,如下所示:
val runnable = source
.via(flow)
.alsoTo(Flow[Option[String]].collect { case None ⇒ None }.to(sink2))
.to(Flow[Option[String]].collect { case Some(x) ⇒ x }.to(sink1))
此时给出的两个答案都涉及Broadcast
。请注意,它可能适用于这个特定示例,但在更复杂的图形中,Broadcast
可能不是一个明智的选择。
原因是如果至少有一个下游背压,Broadcast
总是背压。
最好的背压感知解决方案是 Partition
,它能够从 Partitioner 函数选择的分支选择性地传播背压。
下面的示例(详细说明了 T-Fowl 的回答之一)
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
val partition = builder.add(Partition[Option[T]](2, partitioner))
partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> partition.in
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}