扩展 Observable 发出的值

Span out values emitted by an Observable

我是ReactiveX的初学者,所以这个问题可能很简单,但我还没有通过搜索找到答案。

我有一个非常不规则地发射项目的 Observable(从每秒约 4 次到每 5 秒一次),我想确保它永远不会发射超过每秒一次的项目。我考虑过将 zip 运算符与 Observable.interval() 一起使用,但我意识到如果它在 5 秒后发出一个项目然后在不到一秒内发出 3 个项目,那么所有这些项目将在一秒钟内发出.

有什么简单的方法吗?

有几种方法可以实现您想要的效果。您最终使用什么取决于您的用例。

最后一个油门

throttle 让我们指定您只需要特定时间间隔的新值。这可能最接近您尝试使用 zipinterval:

实现的目标
myObservable.throttleLast(1, TIMEUNIT.SECOND)

这将发出每秒发出的最新信号。第二个内发出的其他信号被丢弃。

缓冲区

bufferthrottleLast 的功能大致相同,但它不会传递最近发出的值,而是 return 一个 Flowable 中发出的所有值时间跨度。

myObservable.buffer(1, TIMEUNIT.SECOND)