不能将 reactivestream 订阅者与 akka 流源一起使用
Can't use reactivestream Subscriber with akka stream sources
我正在尝试将 reactivestream 订阅者附加到 akka 源。
我的来源似乎在一个简单的接收器(如 foreach)上工作良好 - 但如果我放入一个真正的接收器,由订阅者制作,我什么也得不到。
我的背景是:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.reactivestreams.{Subscriber, Subscription}
implicit val system = ActorSystem.create("test")
implicit val materializer = ActorMaterializer.create(system)
class PrintSubscriber extends Subscriber[String] {
override def onError(t: Throwable): Unit = {}
override def onSubscribe(s: Subscription): Unit = {}
override def onComplete(): Unit = {}
override def onNext(t: String): Unit = {
println(t)
}
}
我的测试用例是:
val subscriber = new PrintSubscriber()
val sink = Sink.fromSubscriber(subscriber)
val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc"))
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz"))
source1.to(sink).run()(materializer)
source2.runForeach(println)
我得到输出:
aaa
bbb
ccc
为什么我没有得到 xxx、yyy 和 zzz?
下面引用 Subscriber
的 Reactive Streams 规范:
Will receive call to onSubscribe(Subscription) once after passing an
instance of Subscriber to Publisher.subscribe(Subscriber). No further
notifications will be received until Subscription.request(long) is
called.
要让某些项目流向您的订阅者,您可以做的最小更改是
override def onSubscribe(s: Subscription): Unit = {
s.request(3)
}
但是,请记住,这不会使其完全符合 Reactive Streams specs。 not-so-easy 实施是 Akka-Streams 本身等更高级别工具包背后的主要原因。
我正在尝试将 reactivestream 订阅者附加到 akka 源。
我的来源似乎在一个简单的接收器(如 foreach)上工作良好 - 但如果我放入一个真正的接收器,由订阅者制作,我什么也得不到。
我的背景是:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.reactivestreams.{Subscriber, Subscription}
implicit val system = ActorSystem.create("test")
implicit val materializer = ActorMaterializer.create(system)
class PrintSubscriber extends Subscriber[String] {
override def onError(t: Throwable): Unit = {}
override def onSubscribe(s: Subscription): Unit = {}
override def onComplete(): Unit = {}
override def onNext(t: String): Unit = {
println(t)
}
}
我的测试用例是:
val subscriber = new PrintSubscriber()
val sink = Sink.fromSubscriber(subscriber)
val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc"))
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz"))
source1.to(sink).run()(materializer)
source2.runForeach(println)
我得到输出:
aaa
bbb
ccc
为什么我没有得到 xxx、yyy 和 zzz?
下面引用 Subscriber
的 Reactive Streams 规范:
Will receive call to onSubscribe(Subscription) once after passing an instance of Subscriber to Publisher.subscribe(Subscriber). No further notifications will be received until Subscription.request(long) is called.
要让某些项目流向您的订阅者,您可以做的最小更改是
override def onSubscribe(s: Subscription): Unit = {
s.request(3)
}
但是,请记住,这不会使其完全符合 Reactive Streams specs。 not-so-easy 实施是 Akka-Streams 本身等更高级别工具包背后的主要原因。