如何在缓冲区为空时停止从 PublishSubject 发射

How to stop emission from PublishSubject when buffer is empty

我正在尝试使用 RxJava 创建顺序下载服务。 用户可以批量添加项目(20、30 等)或单个项目。这些项目将被添加到队列中,然后以 10 个为一组顺序下载。 为此,我正在使用 PublishSubject:

PublishSubject<Int> pubSubject = PublishSubject.create();

它发出用户添加的项目 (id),然后将缓冲区运算符应用于批处理项目。使用这些 id,项目在 flatMap 中下载,在订阅的 onNext 中 return。

  pubSubject.buffer(1, TimeUnit.SECONDS, 10)
            .observeOn(Schedulers.io())
            .flatMap { idsBatch -> downloadByIds(idsBatch) }
            .subscribe(
                /* onNext */ { apiResponse -> handleResponse() },
                /* onError */ { handleError(it) },
                /* onComplete*/ { hideProgressBar() }
             )

代码大部分都按预期运行。项目已成功批处理和下载,但即使在所有项目都发出后,缓冲区仍会使用空列表调用 flatMap 并且永远不会调用 onComplete()。

我想知道RxJava有没有什么方法或者方式可以在buffer中没有更多的item时得到onComplete的回调。因为否则我的下载服务永远不会终止。

您可以使用 takeWhile 操作:

Returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.

pubSubject.buffer(1, TimeUnit.SECONDS, 10)
          .observeOn(Schedulers.io())
          .takeWhile { idsBatch -> idsBatch.isNotEmpty() }
          .flatMap { idsBatch -> downloadByIds(idsBatch) }
          .subscribe(
              /* onNext */ { apiResponse -> handleResponse() },
              /* onError */ { handleError(it) },
              /* onComplete*/ { hideProgressBar() }
           )