使用 RxJava 限制吞吐量
Limit throughput with RxJava
我现在遇到的情况很难解释,所以我会写一个更简单的版本来解释这个问题。
我有一个 Observable.from()
,它发出由 ArrayList
文件定义的一系列文件。所有这些文件都应该上传到服务器。为此,我有一个函数可以完成这项工作,returns 和 Observable
。
Observable<Response> uploadFile(File file);
当我 运行 这段代码变得疯狂时,Observable.from()
发出所有文件并且它们全部上传,或者至少对于它可以处理的最大线程。
我想同时上传最多 2 个文件。有接线员可以帮我处理吗?
我尝试了 buffer、window 和其他一些,但它们似乎只同时发出两个项目而不是两个并行文件不断上传。我也尝试在上传部分设置一个最大线程池,但这不能用于我的情况。
应该有一个简单的运算符吧?我错过了什么吗?
我认为所有文件都是并行上传的,因为您使用的是 flatMap()
,它会同时执行所有转换。相反,您应该使用 concatMap()
,其中 运行 是一个接一个的转换。对于 运行 两个并行上传,您需要对您的文件调用 window(2)
可观察,然后像您在代码中所做的那样调用 flatMap()
。
Observable<Response> responses =
files
.window(2)
.concatMap(windowFiles ->
windowFiles.flatMap(file -> uploadFile(file));
);
更新:
我找到了一个更好的解决方案,它完全符合您的要求。 flatMap()
的重载接受最大并发线程数。
Observable<Response> responses =
files
.onBackpressureBuffer()
.flatMap(index -> {
return uploadFile(file).subscribeOn(Schedulers.io());
}, 2);
我现在遇到的情况很难解释,所以我会写一个更简单的版本来解释这个问题。
我有一个 Observable.from()
,它发出由 ArrayList
文件定义的一系列文件。所有这些文件都应该上传到服务器。为此,我有一个函数可以完成这项工作,returns 和 Observable
。
Observable<Response> uploadFile(File file);
当我 运行 这段代码变得疯狂时,Observable.from()
发出所有文件并且它们全部上传,或者至少对于它可以处理的最大线程。
我想同时上传最多 2 个文件。有接线员可以帮我处理吗?
我尝试了 buffer、window 和其他一些,但它们似乎只同时发出两个项目而不是两个并行文件不断上传。我也尝试在上传部分设置一个最大线程池,但这不能用于我的情况。
应该有一个简单的运算符吧?我错过了什么吗?
我认为所有文件都是并行上传的,因为您使用的是 flatMap()
,它会同时执行所有转换。相反,您应该使用 concatMap()
,其中 运行 是一个接一个的转换。对于 运行 两个并行上传,您需要对您的文件调用 window(2)
可观察,然后像您在代码中所做的那样调用 flatMap()
。
Observable<Response> responses =
files
.window(2)
.concatMap(windowFiles ->
windowFiles.flatMap(file -> uploadFile(file));
);
更新:
我找到了一个更好的解决方案,它完全符合您的要求。 flatMap()
的重载接受最大并发线程数。
Observable<Response> responses =
files
.onBackpressureBuffer()
.flatMap(index -> {
return uploadFile(file).subscribeOn(Schedulers.io());
}, 2);