如何从广播的 Akka 流中获取订阅者和发布者?
How to get a Subscriber and Publisher from a broadcasted Akka stream?
在使用更复杂的图表时,我在将发布者和订阅者从我的流中移除时遇到了问题。我的目标是提供 API 发布者和订阅者以及 运行 Akka 内部流。这是我的第一次尝试,效果很好。
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val flow = subscriberSource.to(someFunctionSink)
//create Reactive Streams Subscriber
val subscriber: Subscriber[Boolean] = flow.run()
//prints true
Source.single(true).to(Sink(subscriber)).run()
但是对于更复杂的广播图,我不确定如何取出订阅者和发布者对象?我需要部分图表吗?
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]
FlowGraph.closed() { implicit builder =>
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Boolean](2))
subscriberSource ~> broadcast.in
broadcast.out(0) ~> someFunctionSink
broadcast.out(1) ~> publisherSink
}.run()
val subscriber: Subscriber[Boolean] = ???
val publisher: Publisher[Boolean] = ???
当您调用 RunnableGraph.run()
时,流是 运行,结果是 运行 的 "materialized value"。
在您的简单示例中,Source.subscriber[Boolean]
的物化值是 Subscriber[Boolean]
。在您的复杂示例中,您希望将图形的多个组件的具体化值组合为元组 (Subscriber[Boolean], Publisher[Boolean])
.
的具体化值
您可以将对其物化值感兴趣的组件传递给 FlowGraph.closed()
,然后指定一个函数来组合物化值:
import akka.stream.scaladsl._
import org.reactivestreams._
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]
val graph =
FlowGraph.closed(subscriberSource, publisherSink)(Keep.both) { implicit builder ⇒
(in, out) ⇒
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Boolean](2))
in ~> broadcast.in
broadcast.out(0) ~> someFunctionSink
broadcast.out(1) ~> out
}
val (subscriber: Subscriber[Boolean], publisher: Publisher[Boolean]) = graph.run()
参见Scaladocs for more information about the overloads of FlowGraph.closed。
(Keep.both
是函数的缩写 (a, b) => (a, b)
)
在使用更复杂的图表时,我在将发布者和订阅者从我的流中移除时遇到了问题。我的目标是提供 API 发布者和订阅者以及 运行 Akka 内部流。这是我的第一次尝试,效果很好。
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val flow = subscriberSource.to(someFunctionSink)
//create Reactive Streams Subscriber
val subscriber: Subscriber[Boolean] = flow.run()
//prints true
Source.single(true).to(Sink(subscriber)).run()
但是对于更复杂的广播图,我不确定如何取出订阅者和发布者对象?我需要部分图表吗?
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]
FlowGraph.closed() { implicit builder =>
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Boolean](2))
subscriberSource ~> broadcast.in
broadcast.out(0) ~> someFunctionSink
broadcast.out(1) ~> publisherSink
}.run()
val subscriber: Subscriber[Boolean] = ???
val publisher: Publisher[Boolean] = ???
当您调用 RunnableGraph.run()
时,流是 运行,结果是 运行 的 "materialized value"。
在您的简单示例中,Source.subscriber[Boolean]
的物化值是 Subscriber[Boolean]
。在您的复杂示例中,您希望将图形的多个组件的具体化值组合为元组 (Subscriber[Boolean], Publisher[Boolean])
.
您可以将对其物化值感兴趣的组件传递给 FlowGraph.closed()
,然后指定一个函数来组合物化值:
import akka.stream.scaladsl._
import org.reactivestreams._
val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]
val graph =
FlowGraph.closed(subscriberSource, publisherSink)(Keep.both) { implicit builder ⇒
(in, out) ⇒
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Boolean](2))
in ~> broadcast.in
broadcast.out(0) ~> someFunctionSink
broadcast.out(1) ~> out
}
val (subscriber: Subscriber[Boolean], publisher: Publisher[Boolean]) = graph.run()
参见Scaladocs for more information about the overloads of FlowGraph.closed。
(Keep.both
是函数的缩写 (a, b) => (a, b)
)