RxJava - 如何通过 doOnNext 等待异步任务的结果
RxJava - how to wait to result of async tasks wihiting doOnNext
如何实现doOnNext等待多个异步任务的结果?
例如-
public void getImages(User user) {
Flowable.create(new FlowableOnSubscribe<User>() {
@Override
public void subscribe(@io.reactivex.rxjava3.annotations.NonNull FlowableEmitter<User> emitter) throws Throwable {
emitter.onNext(user);
}
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.doOnNext(user -> {
ArrayList<String> imagesUrls = user.getUrls();
for (String url : imagesUrls) {
storage.getReference().child("images").child(url).getBytes(ParametersConventions.FIREBASE_DOWNLOAD_IMAGE_MAX_SIZE).
addOnSuccessListener(bytes -> {
doSomething(bytes);
});
}
})
.doOnNext(user -> {
doSomething();
})
.doOnComplete(...);
}
并且我希望调用 doSomething 的 doOnNext 将在所有用于下载图像的异步调用完成后被调用。
doOnNext 运算符在流中每次有新项目时都会触发,因此它不是您的最佳选择。根据您的需要尝试使用 map/flatMap/concatMap 运算符。如果您需要拨打多个电话然后对数据进行处理,您可以查看我已经回答过的类似问题 link:
在其中你可以找到一种方法来进行顺序网络调用,然后用数据列表做任何你想做的事:D
将 API 调用转换为响应式并将其合并到主流中:
int max = ParametersConventions.FIREBASE_DOWNLOAD_IMAGE_MAX_SIZE;
public Completable downloadAsync(URL url) {
return Completable.create(inner -> {
storage.getReference()
.child("images")
.child(url)
.getBytes(max)
.addOnSuccessListener(bytes -> {
doSomething(bytes);
inner.onComplete();
});
});
}
一起:
Flowable.create(emitter-> {
emitter.onNext(user);
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.concatMapSingle(user ->
Flowable.fromIterable(user.getUrls())
.concatMapCompletable(url -> downloadAsync(url))
.andThen(Single.just(user))
)
.doOnNext(user -> {
doSomething();
})
.doOnComplete(...);
如何实现doOnNext等待多个异步任务的结果?
例如-
public void getImages(User user) {
Flowable.create(new FlowableOnSubscribe<User>() {
@Override
public void subscribe(@io.reactivex.rxjava3.annotations.NonNull FlowableEmitter<User> emitter) throws Throwable {
emitter.onNext(user);
}
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.doOnNext(user -> {
ArrayList<String> imagesUrls = user.getUrls();
for (String url : imagesUrls) {
storage.getReference().child("images").child(url).getBytes(ParametersConventions.FIREBASE_DOWNLOAD_IMAGE_MAX_SIZE).
addOnSuccessListener(bytes -> {
doSomething(bytes);
});
}
})
.doOnNext(user -> {
doSomething();
})
.doOnComplete(...);
}
并且我希望调用 doSomething 的 doOnNext 将在所有用于下载图像的异步调用完成后被调用。
doOnNext 运算符在流中每次有新项目时都会触发,因此它不是您的最佳选择。根据您的需要尝试使用 map/flatMap/concatMap 运算符。如果您需要拨打多个电话然后对数据进行处理,您可以查看我已经回答过的类似问题 link:
将 API 调用转换为响应式并将其合并到主流中:
int max = ParametersConventions.FIREBASE_DOWNLOAD_IMAGE_MAX_SIZE;
public Completable downloadAsync(URL url) {
return Completable.create(inner -> {
storage.getReference()
.child("images")
.child(url)
.getBytes(max)
.addOnSuccessListener(bytes -> {
doSomething(bytes);
inner.onComplete();
});
});
}
一起:
Flowable.create(emitter-> {
emitter.onNext(user);
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.concatMapSingle(user ->
Flowable.fromIterable(user.getUrls())
.concatMapCompletable(url -> downloadAsync(url))
.andThen(Single.just(user))
)
.doOnNext(user -> {
doSomething();
})
.doOnComplete(...);