如何在缓冲区为空时停止从 PublishSubject 发射
How to stop emission from PublishSubject when buffer is empty
我正在尝试使用 RxJava 创建顺序下载服务。
用户可以批量添加项目(20、30 等)或单个项目。这些项目将被添加到队列中,然后以 10 个为一组顺序下载。
为此,我正在使用 PublishSubject:
PublishSubject<Int> pubSubject = PublishSubject.create();
它发出用户添加的项目 (id),然后将缓冲区运算符应用于批处理项目。使用这些 id,项目在 flatMap 中下载,在订阅的 onNext 中 return。
pubSubject.buffer(1, TimeUnit.SECONDS, 10)
.observeOn(Schedulers.io())
.flatMap { idsBatch -> downloadByIds(idsBatch) }
.subscribe(
/* onNext */ { apiResponse -> handleResponse() },
/* onError */ { handleError(it) },
/* onComplete*/ { hideProgressBar() }
)
代码大部分都按预期运行。项目已成功批处理和下载,但即使在所有项目都发出后,缓冲区仍会使用空列表调用 flatMap 并且永远不会调用 onComplete()。
我想知道RxJava有没有什么方法或者方式可以在buffer中没有更多的item时得到onComplete的回调。因为否则我的下载服务永远不会终止。
您可以使用 takeWhile
操作:
Returns an Observable
that emits items emitted by the source ObservableSource
so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.
pubSubject.buffer(1, TimeUnit.SECONDS, 10)
.observeOn(Schedulers.io())
.takeWhile { idsBatch -> idsBatch.isNotEmpty() }
.flatMap { idsBatch -> downloadByIds(idsBatch) }
.subscribe(
/* onNext */ { apiResponse -> handleResponse() },
/* onError */ { handleError(it) },
/* onComplete*/ { hideProgressBar() }
)
我正在尝试使用 RxJava 创建顺序下载服务。 用户可以批量添加项目(20、30 等)或单个项目。这些项目将被添加到队列中,然后以 10 个为一组顺序下载。 为此,我正在使用 PublishSubject:
PublishSubject<Int> pubSubject = PublishSubject.create();
它发出用户添加的项目 (id),然后将缓冲区运算符应用于批处理项目。使用这些 id,项目在 flatMap 中下载,在订阅的 onNext 中 return。
pubSubject.buffer(1, TimeUnit.SECONDS, 10)
.observeOn(Schedulers.io())
.flatMap { idsBatch -> downloadByIds(idsBatch) }
.subscribe(
/* onNext */ { apiResponse -> handleResponse() },
/* onError */ { handleError(it) },
/* onComplete*/ { hideProgressBar() }
)
代码大部分都按预期运行。项目已成功批处理和下载,但即使在所有项目都发出后,缓冲区仍会使用空列表调用 flatMap 并且永远不会调用 onComplete()。
我想知道RxJava有没有什么方法或者方式可以在buffer中没有更多的item时得到onComplete的回调。因为否则我的下载服务永远不会终止。
您可以使用 takeWhile
操作:
Returns an
Observable
that emits items emitted by the sourceObservableSource
so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied.
pubSubject.buffer(1, TimeUnit.SECONDS, 10)
.observeOn(Schedulers.io())
.takeWhile { idsBatch -> idsBatch.isNotEmpty() }
.flatMap { idsBatch -> downloadByIds(idsBatch) }
.subscribe(
/* onNext */ { apiResponse -> handleResponse() },
/* onError */ { handleError(it) },
/* onComplete*/ { hideProgressBar() }
)