如何在 RxJava (RxScala) 中实现 observeLatestOn?
How to implement observeLatestOn in RxJava (RxScala)?
我正在尝试在 RxJava(实际上是 RxScala)中实现 ObserveLatestOn 运算符。
当我们有一个快速的生产者和一个慢速的订阅者时,这个运算符很有用,但订阅者不关心它在消费一个项目时丢失的任何项目。
大理石图:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
=
字符表示订阅者正在执行一项较长的运行工作,>
字符表示刚刚完成的工作。作为典型的用法示例,想象一些需要显示的数据的生产者,以及作为订阅者的数据的屏幕渲染器。渲染时间比较长,但是我们不需要在屏幕上渲染每一步,最后一步就完美了。
在上面的弹珠图中,生产者发信号1,订阅者开始处理,耗时较长。同时,生产者发出 2 和 3,之后订阅者才完成工作。它看到生产者发出的最后一个项目是 3,所以它开始处理它。那很快,这期间还没有新的item产生,订阅者就可以休息了。然后,5来了,故事以同样的方式继续。
我花了几个小时尝试实现这个看似简单的运算符,但我仍然不满意。运算符的本质表明它应该是异步的,它应该在与接收它们的调度程序不同的调度程序上发出它的项目。但与此同时,我当然不希望在没有工作可做的情况下让线程被工作人员占用。
这是我到目前为止的想法:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
这似乎可行,但我不确定是否没有竞争条件或死锁。另外,我不确定解决方案是否可以变得更简单。我也一直在考虑另一种方法,比如
- 巧妙地使用了
OperatorDebounceWithSelector
- 一次仅请求一项的可观察对象的组合,
observeOn
和 onBackpressureBuffer(1)
我也不知道如何为此编写确定性单元测试。 scheduleRec
调度的工作在与TestScheduler
一起使用时不能被打断,我需要使用一个真正在不同线程上工作的调度器。我发现很难为多线程代码的竞争条件编写正确的单元测试。
所以,问题仍然存在:我的解决方案是否正确?有没有更简单、更好或更正确的方法呢?以及如何测试它的正确性?
我建议使用 lift
来实现这个运算符。这是我的解决方案:
package object ObservableEx {
implicit class ObserveLatestOn[T](val o: Observable[T]) {
def observeLatestOn(scheduler: Scheduler): Observable[T] = {
o.lift { (child: Subscriber[T]) =>
val worker = scheduler.createWorker
child.add(worker)
val parent = new Subscriber[T] {
private val lock = new AnyRef
// protected by "lock"
private var latest: Notification[T] = null
// protected by "lock"
// Means no task runs in the worker
private var idle = true
private var done = false
override def onStart(): Unit = {
request(Long.MaxValue)
}
override def onNext(v: T): Unit = {
if (!done) {
emit(OnNext(v))
}
}
override def onCompleted(): Unit = {
if (!done) {
done = true
emit(OnCompleted)
}
}
override def onError(e: Throwable): Unit = {
if (!done) {
done = true
emit(OnError(e))
}
}
def emit(v: Notification[T]): Unit = {
var shouldSchedule = false
lock.synchronized {
latest = v
if (idle) {
// worker is idle so we should schedule a task
shouldSchedule = true
// We will schedule a task, so the worker will be busy
idle = false
}
}
if (shouldSchedule) {
worker.schedule {
var n: Notification[T] = null
var exit = false
while (!exit) {
lock.synchronized {
if (latest == null) {
// No new item arrives and we are leaving the worker, so set "idle"
idle = true
exit = true
} else {
n = latest
latest = null
}
}
if (!exit) {
n.accept(child)
}
}
}
}
}
}
child.add(parent)
parent
}
}
}
}
还有一个单元测试
import ObservableEx.ObserveLatestOn
@Test
def testObserveLatestOn(): Unit = {
val scheduler = TestScheduler()
val xs = mutable.ArrayBuffer[Long]()
var completed = false
Observable.interval(100 milliseconds, scheduler).take(10).observeLatestOn(scheduler).subscribe(v => {
scheduler.advanceTimeBy(200 milliseconds)
xs += v
},
e => e.printStackTrace(),
() => completed = true
)
scheduler.advanceTimeBy(100 milliseconds)
assert(completed === true)
assert(xs === List(0, 2, 4, 6, 8))
}
我有一个 PR,其中运算符 onBackpressureLatest()
应该具有预期的行为,但您需要并发性并且可以像往常一样使用 observeOn
。
我正在尝试在 RxJava(实际上是 RxScala)中实现 ObserveLatestOn 运算符。
当我们有一个快速的生产者和一个慢速的订阅者时,这个运算符很有用,但订阅者不关心它在消费一个项目时丢失的任何项目。
大理石图:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
=
字符表示订阅者正在执行一项较长的运行工作,>
字符表示刚刚完成的工作。作为典型的用法示例,想象一些需要显示的数据的生产者,以及作为订阅者的数据的屏幕渲染器。渲染时间比较长,但是我们不需要在屏幕上渲染每一步,最后一步就完美了。
在上面的弹珠图中,生产者发信号1,订阅者开始处理,耗时较长。同时,生产者发出 2 和 3,之后订阅者才完成工作。它看到生产者发出的最后一个项目是 3,所以它开始处理它。那很快,这期间还没有新的item产生,订阅者就可以休息了。然后,5来了,故事以同样的方式继续。
我花了几个小时尝试实现这个看似简单的运算符,但我仍然不满意。运算符的本质表明它应该是异步的,它应该在与接收它们的调度程序不同的调度程序上发出它的项目。但与此同时,我当然不希望在没有工作可做的情况下让线程被工作人员占用。
这是我到目前为止的想法:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
这似乎可行,但我不确定是否没有竞争条件或死锁。另外,我不确定解决方案是否可以变得更简单。我也一直在考虑另一种方法,比如
- 巧妙地使用了
OperatorDebounceWithSelector
- 一次仅请求一项的可观察对象的组合,
observeOn
和onBackpressureBuffer(1)
我也不知道如何为此编写确定性单元测试。 scheduleRec
调度的工作在与TestScheduler
一起使用时不能被打断,我需要使用一个真正在不同线程上工作的调度器。我发现很难为多线程代码的竞争条件编写正确的单元测试。
所以,问题仍然存在:我的解决方案是否正确?有没有更简单、更好或更正确的方法呢?以及如何测试它的正确性?
我建议使用 lift
来实现这个运算符。这是我的解决方案:
package object ObservableEx {
implicit class ObserveLatestOn[T](val o: Observable[T]) {
def observeLatestOn(scheduler: Scheduler): Observable[T] = {
o.lift { (child: Subscriber[T]) =>
val worker = scheduler.createWorker
child.add(worker)
val parent = new Subscriber[T] {
private val lock = new AnyRef
// protected by "lock"
private var latest: Notification[T] = null
// protected by "lock"
// Means no task runs in the worker
private var idle = true
private var done = false
override def onStart(): Unit = {
request(Long.MaxValue)
}
override def onNext(v: T): Unit = {
if (!done) {
emit(OnNext(v))
}
}
override def onCompleted(): Unit = {
if (!done) {
done = true
emit(OnCompleted)
}
}
override def onError(e: Throwable): Unit = {
if (!done) {
done = true
emit(OnError(e))
}
}
def emit(v: Notification[T]): Unit = {
var shouldSchedule = false
lock.synchronized {
latest = v
if (idle) {
// worker is idle so we should schedule a task
shouldSchedule = true
// We will schedule a task, so the worker will be busy
idle = false
}
}
if (shouldSchedule) {
worker.schedule {
var n: Notification[T] = null
var exit = false
while (!exit) {
lock.synchronized {
if (latest == null) {
// No new item arrives and we are leaving the worker, so set "idle"
idle = true
exit = true
} else {
n = latest
latest = null
}
}
if (!exit) {
n.accept(child)
}
}
}
}
}
}
child.add(parent)
parent
}
}
}
}
还有一个单元测试
import ObservableEx.ObserveLatestOn
@Test
def testObserveLatestOn(): Unit = {
val scheduler = TestScheduler()
val xs = mutable.ArrayBuffer[Long]()
var completed = false
Observable.interval(100 milliseconds, scheduler).take(10).observeLatestOn(scheduler).subscribe(v => {
scheduler.advanceTimeBy(200 milliseconds)
xs += v
},
e => e.printStackTrace(),
() => completed = true
)
scheduler.advanceTimeBy(100 milliseconds)
assert(completed === true)
assert(xs === List(0, 2, 4, 6, 8))
}
我有一个 PR,其中运算符 onBackpressureLatest()
应该具有预期的行为,但您需要并发性并且可以像往常一样使用 observeOn
。