在不丢弃项目或在 RxJava 中序列化的情况下处理背压
Handling backpressure without dropping items or serializing in RxJava
简而言之,有什么解决方案可以解决 RxJava 中的背压问题,而无需诉诸于删除项、序列化操作或无限缓冲?
将以下任务作为这可能有用的示例。
- 正在将数据从磁盘读入内存
- 压缩数据
- 正在通过网络传输压缩数据
直接的方法是在单个后台线程上按顺序执行所有任务,如:
observeBlocksOfFileContents(file).
.subscribeOn(backgroundScheduler)
.map(compressBlock)
.subscribe(transmitBlock);
虽然这没有问题,但从性能的角度来看,它不是最优的,因为 运行 时间是所有三个操作的总和,而不是它们的最大值,因为它是 运行并行:
observeBlocksOfFileContents(file).
.subscribeOn(diskScheduler)
.observeOn(cpuScheduler)
.map(compressBlock)
.observeOn(networkScheduler)
.subscribe(transmitBlock);
但是,如果从磁盘读取数据的速度快于压缩和传输数据的速度,则可能会由于背压而失败。由于以下原因,通常的背压解决方案是不可取的:
- 丢件:文件必须完整传送,不漏件
- 单线程序列化:失去流水线的性能提升
- 调用堆栈阻塞:not supported in RxJava
- 增加observeOn buffers:内存消耗可能变成文件大小的数倍
- 在没有 MissingBackpressureException 的情况下重新实现 observeOn:大量工作并且中断流畅 API
还有其他解决办法吗?或者这是根本不适合 ReactiveX 可观察模型的东西?
6) 实施 observeBlocksOfFileContents 以支持背压。
文件系统已经是基于拉取的(InputStream.read() 发生在你想要它而不是扔给你的时候)所以考虑一个合理的块大小并在每个请求中读取它:
Observable.create(SyncOnSubscribe.createStateful(
() -> new FileInputStream("file.dat")
(in, out) -> {
byte[] buf = new byte[4096];
int r = in.read(buf);
if (r < 0) {
out.onCompleted();
} else {
if (r == buf.length) {
out.onNext(buf);
} else {
byte[] buf2 = new byte[r];
System.arraycopy(buf, 0, buf2, 0, r);
out.onNext(buf2);
}
}
},
in -> in.close()
));
(为简洁起见省略了 Try-catch。)
简而言之,有什么解决方案可以解决 RxJava 中的背压问题,而无需诉诸于删除项、序列化操作或无限缓冲?
将以下任务作为这可能有用的示例。
- 正在将数据从磁盘读入内存
- 压缩数据
- 正在通过网络传输压缩数据
直接的方法是在单个后台线程上按顺序执行所有任务,如:
observeBlocksOfFileContents(file).
.subscribeOn(backgroundScheduler)
.map(compressBlock)
.subscribe(transmitBlock);
虽然这没有问题,但从性能的角度来看,它不是最优的,因为 运行 时间是所有三个操作的总和,而不是它们的最大值,因为它是 运行并行:
observeBlocksOfFileContents(file).
.subscribeOn(diskScheduler)
.observeOn(cpuScheduler)
.map(compressBlock)
.observeOn(networkScheduler)
.subscribe(transmitBlock);
但是,如果从磁盘读取数据的速度快于压缩和传输数据的速度,则可能会由于背压而失败。由于以下原因,通常的背压解决方案是不可取的:
- 丢件:文件必须完整传送,不漏件
- 单线程序列化:失去流水线的性能提升
- 调用堆栈阻塞:not supported in RxJava
- 增加observeOn buffers:内存消耗可能变成文件大小的数倍
- 在没有 MissingBackpressureException 的情况下重新实现 observeOn:大量工作并且中断流畅 API
还有其他解决办法吗?或者这是根本不适合 ReactiveX 可观察模型的东西?
6) 实施 observeBlocksOfFileContents 以支持背压。
文件系统已经是基于拉取的(InputStream.read() 发生在你想要它而不是扔给你的时候)所以考虑一个合理的块大小并在每个请求中读取它:
Observable.create(SyncOnSubscribe.createStateful(
() -> new FileInputStream("file.dat")
(in, out) -> {
byte[] buf = new byte[4096];
int r = in.read(buf);
if (r < 0) {
out.onCompleted();
} else {
if (r == buf.length) {
out.onNext(buf);
} else {
byte[] buf2 = new byte[r];
System.arraycopy(buf, 0, buf2, 0, r);
out.onNext(buf2);
}
}
},
in -> in.close()
));
(为简洁起见省略了 Try-catch。)