如何在 Scala 中将反应式 Publisher 转换为简单的 Stream?
How to convert reactive Publisher to simple Stream in Scala?
是否可以转换 org.reactivestreams.Publisher
instance to scala.Stream
?如果可以,怎么做?
像下面这样的东西对你有用吗?
val queue: java.util.concurrent.BlockingQueue[T] = ... // TODO: choose appropriate BlockingQueue implementation
publisher.subscribe(new Subscriber[T] {
override def onNext(t: T): Unit = { queue.put(t) }
// TODO: implement other Subscriber methods
}
val stream = Stream.continually(queue.take)
是否可以转换 org.reactivestreams.Publisher
instance to scala.Stream
?如果可以,怎么做?
像下面这样的东西对你有用吗?
val queue: java.util.concurrent.BlockingQueue[T] = ... // TODO: choose appropriate BlockingQueue implementation
publisher.subscribe(new Subscriber[T] {
override def onNext(t: T): Unit = { queue.put(t) }
// TODO: implement other Subscriber methods
}
val stream = Stream.continually(queue.take)