通过 observable 本身控制 observable 缓冲
Controlling observable buffering by observable itself
我正在尝试自己对可观察流进行切片,例如:
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))
输出为:
Buffer()
Buffer()
Buffer()
Buffer()
source
可能在 boundaries
行上迭代,在它到达 result
之前,因此它只创建边界和结果缓冲区,但没有任何内容可填充。
我的方法是使用 publish
/connect
:
val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)
result2.subscribe((buf) => println(buf.toString))
source2.connect
这会产生正确的输出:
Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)
现在我只需要对外部世界隐藏 connect
并在 result
订阅时 connect
它(我在 class 中这样做并且我不不想暴露它)。类似于:
val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
.tumblingBuffer(boundaries3)
.doOnSubscribe(() => source3.connect)
result3.subscribe((buf) => println(buf.toString))
但是现在,doOnSubscribe
操作从未被调用所以发布 source
从未被连接...
怎么了?
您的 publish
解决方案走在了正确的轨道上。然而,还有一个替代的 publish
运算符,它接受一个 lambda 作为它的 Observable[T] => Observable[R]
类型的参数 (see documentation)。此 lambda 的参数是原始流,您可以安全地多次订阅它。在 lambda 中,您可以根据自己的喜好转换原始流;在您的情况下,您过滤流 并 在该过滤器上缓冲它。
Observable.from(1 to 10)
.publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0)))
.subscribe(buf => println(buf.toString()))
这个运算符的最大好处是你不需要在之后调用任何类似 connect
的东西。
我正在尝试自己对可观察流进行切片,例如:
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))
输出为:
Buffer()
Buffer()
Buffer()
Buffer()
source
可能在 boundaries
行上迭代,在它到达 result
之前,因此它只创建边界和结果缓冲区,但没有任何内容可填充。
我的方法是使用 publish
/connect
:
val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)
result2.subscribe((buf) => println(buf.toString))
source2.connect
这会产生正确的输出:
Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)
现在我只需要对外部世界隐藏 connect
并在 result
订阅时 connect
它(我在 class 中这样做并且我不不想暴露它)。类似于:
val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
.tumblingBuffer(boundaries3)
.doOnSubscribe(() => source3.connect)
result3.subscribe((buf) => println(buf.toString))
但是现在,doOnSubscribe
操作从未被调用所以发布 source
从未被连接...
怎么了?
您的 publish
解决方案走在了正确的轨道上。然而,还有一个替代的 publish
运算符,它接受一个 lambda 作为它的 Observable[T] => Observable[R]
类型的参数 (see documentation)。此 lambda 的参数是原始流,您可以安全地多次订阅它。在 lambda 中,您可以根据自己的喜好转换原始流;在您的情况下,您过滤流 并 在该过滤器上缓冲它。
Observable.from(1 to 10)
.publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0)))
.subscribe(buf => println(buf.toString()))
这个运算符的最大好处是你不需要在之后调用任何类似 connect
的东西。