Backpressure 是如何在 RxJava 内部发生的
How does Backpressure happen internally in RxJava
我已经阅读了一些关于 RxJava 背压的文档,但是我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结一下,比如 "producer" 太快了 "consumer"太慢了。
例如像下面的代码:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
我一直在浏览 RxJava 源代码,所以我的理解是在主线程中我们将每毫秒发出一次事件,一旦我们发出它,我们将值传递给 System.out.println(i) 方法并将其放入 newThead 调度程序的线程池和 运行 运行nable 中的方法。
所以我的问题是,异常是如何在内部发生的?因为当我们调用 Thread.sleep() 时,我们只是在休眠处理方法调用的线程 -> System.out.println() 而不会影响线程池中的其他线程,怎么会导致异常。是因为线程池没有足够的可用线程了吗?
谢谢
您可以将背压视为一个操作员分发给其上游源的许可系统:您可以给我 128 个元素。稍后,该操作员可能会说 "okay, give me another 96",因此总共可能有 224 个未完成的许可。一些来源,例如 interval
不关心许可,只是定期分发值。由于许可的数量通常与队列或缓冲区中的可用容量密切相关,因此分发超过这些存储量的容量可以容纳收益 MissingBackpressureException
.
检测背压违规主要发生在 offer
到有界队列 returns false 时,例如 observeOn
中的那个表示队列已满。
检测违规的第二种方法是跟踪运营商中未完成的许可计数,例如 onBackpressureDrop
并且每当上游发送超过此数量时,运营商根本不会转发它:
// in onBackpressureDrop
public void onNext(T value) {
if (emitted != availablePermits) {
emitted++;
child.onNext(value);
} else {
// ignoring this value
}
}
子订阅者通过 request() 发出许可信号,这通常会导致 onBackpressureDrop
中的类似情况:
public void childRequested(long n) {
availablePermits += n;
}
实际上,由于可能的异步执行,availablePermits
是一个 AtomicLong
(并称为 requested
)。
我已经阅读了一些关于 RxJava 背压的文档,但是我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结一下,比如 "producer" 太快了 "consumer"太慢了。
例如像下面的代码:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
我一直在浏览 RxJava 源代码,所以我的理解是在主线程中我们将每毫秒发出一次事件,一旦我们发出它,我们将值传递给 System.out.println(i) 方法并将其放入 newThead 调度程序的线程池和 运行 运行nable 中的方法。
所以我的问题是,异常是如何在内部发生的?因为当我们调用 Thread.sleep() 时,我们只是在休眠处理方法调用的线程 -> System.out.println() 而不会影响线程池中的其他线程,怎么会导致异常。是因为线程池没有足够的可用线程了吗?
谢谢
您可以将背压视为一个操作员分发给其上游源的许可系统:您可以给我 128 个元素。稍后,该操作员可能会说 "okay, give me another 96",因此总共可能有 224 个未完成的许可。一些来源,例如 interval
不关心许可,只是定期分发值。由于许可的数量通常与队列或缓冲区中的可用容量密切相关,因此分发超过这些存储量的容量可以容纳收益 MissingBackpressureException
.
检测背压违规主要发生在 offer
到有界队列 returns false 时,例如 observeOn
中的那个表示队列已满。
检测违规的第二种方法是跟踪运营商中未完成的许可计数,例如 onBackpressureDrop
并且每当上游发送超过此数量时,运营商根本不会转发它:
// in onBackpressureDrop
public void onNext(T value) {
if (emitted != availablePermits) {
emitted++;
child.onNext(value);
} else {
// ignoring this value
}
}
子订阅者通过 request() 发出许可信号,这通常会导致 onBackpressureDrop
中的类似情况:
public void childRequested(long n) {
availablePermits += n;
}
实际上,由于可能的异步执行,availablePermits
是一个 AtomicLong
(并称为 requested
)。