如何从 monix observable 获取下一个元素?
How to get the next element from monix observable?
我有一个随便记录的数据流,但在我的程序的某个状态下,我需要流中的数据,不是到那时为止观察到的最新数据(我可以做到),而是最新的一个之后:
val dataStream : Observable[Data] = ...
dataStream
.doOnNext(logger.trace(_))
.subscribe()
// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))
如何实施 def awaitOneElement(Observable[Data]): Data = ???
?
我知道这可能在惯用语上是不正确的,但肮脏的同步等待正是我所需要的。我对 Observable[Data] => Future[Data]
也很好,将在下一步用 Await
换行。
如果您只需要一个元素,可以使用 Consumer.head
或 Consumer.headOption
之类的东西:
val first: Task[Option[Int]] =
Observable.empty[Int].consumeWith(Consumer.headOption)]
现在您可以将任务转换为未来。
如果您需要所有元素,您可以将 foldLeft 与累加器一起使用:
Consumer.foldLeft[T,Vector[T]](Vector.empty[T])((vec, t) => vec :+ t )
或者在需要时实现自定义消费者和触发回调。
取自文档:
https://monix.io/docs/3x/reactive/consumer.html
更新:
对于最后一个元素,您可以使用这样的消费者:
Consumer.foldLeft[T,Option[T]](Option.empty[T])((opt, t) => Some(t))
我的解决方案:
implicit val scheduler = mx
val pendingPromise: AtomicReference[Option[Promise[A]]] = new AtomicReference(None)
val lastSubject: ConcurrentSubject[A, A] = ConcurrentSubject.publish[A]
o.doOnNext(a => if (pendingPromise.get().nonEmpty) lastSubject.onNext(a))
.subscribe()
lastSubject
.filter(filter)
.doOnNext(a => pendingPromise.getAndSet(None).map(_.success(a)))
.subscribe()
def getNext(duration: Duration): Try[A] = {
val promise = Promise[A]()
pendingPromise.set(Some(promise))
Try(Await.result(promise.future, duration))
}
}
调用 getNext 时,会创建一个 Promise
并返回 future。当期望的事件在 Observable 中发生时,promise 被填充并移除。
我有一个随便记录的数据流,但在我的程序的某个状态下,我需要流中的数据,不是到那时为止观察到的最新数据(我可以做到),而是最新的一个之后:
val dataStream : Observable[Data] = ...
dataStream
.doOnNext(logger.trace(_))
.subscribe()
// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))
如何实施 def awaitOneElement(Observable[Data]): Data = ???
?
我知道这可能在惯用语上是不正确的,但肮脏的同步等待正是我所需要的。我对 Observable[Data] => Future[Data]
也很好,将在下一步用 Await
换行。
如果您只需要一个元素,可以使用 Consumer.head
或 Consumer.headOption
之类的东西:
val first: Task[Option[Int]] =
Observable.empty[Int].consumeWith(Consumer.headOption)]
现在您可以将任务转换为未来。 如果您需要所有元素,您可以将 foldLeft 与累加器一起使用:
Consumer.foldLeft[T,Vector[T]](Vector.empty[T])((vec, t) => vec :+ t )
或者在需要时实现自定义消费者和触发回调。 取自文档: https://monix.io/docs/3x/reactive/consumer.html
更新: 对于最后一个元素,您可以使用这样的消费者:
Consumer.foldLeft[T,Option[T]](Option.empty[T])((opt, t) => Some(t))
我的解决方案:
implicit val scheduler = mx
val pendingPromise: AtomicReference[Option[Promise[A]]] = new AtomicReference(None)
val lastSubject: ConcurrentSubject[A, A] = ConcurrentSubject.publish[A]
o.doOnNext(a => if (pendingPromise.get().nonEmpty) lastSubject.onNext(a))
.subscribe()
lastSubject
.filter(filter)
.doOnNext(a => pendingPromise.getAndSet(None).map(_.success(a)))
.subscribe()
def getNext(duration: Duration): Try[A] = {
val promise = Promise[A]()
pendingPromise.set(Some(promise))
Try(Await.result(promise.future, duration))
}
}
调用 getNext 时,会创建一个 Promise
并返回 future。当期望的事件在 Observable 中发生时,promise 被填充并移除。