使用 RxJava 限制吞吐量

Limit throughput with RxJava

我现在遇到的情况很难解释,所以我会写一个更简单的版本来解释这个问题。

我有一个 Observable.from(),它发出由 ArrayList 文件定义的一系列文件。所有这些文件都应该上传到服务器。为此,我有一个函数可以完成这项工作,returns 和 Observable

Observable<Response> uploadFile(File file);

当我 运行 这段代码变得疯狂时,Observable.from() 发出所有文件并且它们全部上传,或者至少对于它可以处理的最大线程。

我想同时上传最多 2 个文件。有接线员可以帮我处理吗?

我尝试了 bufferwindow 和其他一些,但它们似乎只同时发出两个项目而不是两个并行文件不断上传。我也尝试在上传部分设置一个最大线程池,但这不能用于我的情况。

应该有一个简单的运算符吧?我错过了什么吗?

我认为所有文件都是并行上传的,因为您使用的是 flatMap(),它会同时执行所有转换。相反,您应该使用 concatMap(),其中 运行 是一个接一个的转换。对于 运行 两个并行上传,您需要对您的文件调用 window(2) 可观察,然后像您在代码中所做的那样调用 flatMap()

Observable<Response> responses = 
    files
      .window(2)
      .concatMap(windowFiles ->
        windowFiles.flatMap(file -> uploadFile(file));
      );

更新:

我找到了一个更好的解决方案,它完全符合您的要求。 flatMap() 的重载接受最大并发线程数。

Observable<Response> responses = 
    files
      .onBackpressureBuffer()
      .flatMap(index -> {
        return uploadFile(file).subscribeOn(Schedulers.io());
      }, 2);