RxJava - 如何通过 doOnNext 等待异步任务的结果

RxJava - how to wait to result of async tasks wihiting doOnNext

如何实现doOnNext等待多个异步任务的结果?

例如-

public void getImages(User user) {
    Flowable.create(new FlowableOnSubscribe<User>() {
        @Override
        public void subscribe(@io.reactivex.rxjava3.annotations.NonNull FlowableEmitter<User> emitter) throws Throwable {
            emitter.onNext(user);
        }
    }, BackpressureStrategy.BUFFER)
            .observeOn(Schedulers.io())
            .doOnNext(user -> {
                ArrayList<String> imagesUrls = user.getUrls();
                for (String url : imagesUrls) {
                    storage.getReference().child("images").child(url).getBytes(ParametersConventions.FIREBASE_DOWNLOAD_IMAGE_MAX_SIZE).
                    addOnSuccessListener(bytes -> {
                       doSomething(bytes);    
                    });
                }
            })
            .doOnNext(user -> {
                doSomething();
            })
            .doOnComplete(...);
}

并且我希望调用 doSomething 的 doOnNext 将在所有用于下载图像的异步调用完成后被调用。

doOnNext 运算符在流中每次有新项目时都会触发,因此它不是您的最佳选择。根据您的需要尝试使用 map/flatMap/concatMap 运算符。如果您需要拨打多个电话然后对数据进行处理,您可以查看我已经回答过的类似问题 link: 在其中你可以找到一种方法来进行顺序网络调用,然后用数据列表做任何你想做的事:D

将 API 调用转换为响应式并将其合并到主流中:

int max = ParametersConventions.FIREBASE_DOWNLOAD_IMAGE_MAX_SIZE;

public Completable downloadAsync(URL url) {
    return Completable.create(inner -> {
                           storage.getReference()
                           .child("images")
                           .child(url)
                           .getBytes(max)
                           .addOnSuccessListener(bytes -> {
                                doSomething(bytes);
                                inner.onComplete();
                           });
                     });
}

一起:


Flowable.create(emitter-> {
       emitter.onNext(user);
    }, BackpressureStrategy.BUFFER)
            .observeOn(Schedulers.io())
            .concatMapSingle(user -> 
                 Flowable.fromIterable(user.getUrls())
                     .concatMapCompletable(url -> downloadAsync(url))
                     .andThen(Single.just(user))
             )
             .doOnNext(user -> {
                 doSomething();
             })
             .doOnComplete(...);