在另一个 Observable 中订阅一个 Observable
Subscribing an observable inside another Observable
我正在调试我的代码,它从数据库中获取 UserWallet
s,然后通过连接外部 REST API 为它们生成地址。现在我有一个订阅嵌套在另一个订阅中,但我读到它是一个糟糕的解决方案(它实际上不起作用,我认为这就是原因)。
userWalletDao.getUnregisteredUserWallets()
.subscribe(nextWallet -> {
log.info("Fetched next wallet for registration {}", nextWallet);
blockchainIntegration.registerUserWallet(nextWallet.getUserId())
.subscribe(address -> {
nextWallet.setAddress(address);
userWalletDao.persistUserWalletAddress(nextWallet);
log.info("Registered wallet {} with address {}.", nextWallet, address);
});
});
我试图在一个订阅中完成它,但是如果我将钱包映射到地址,我会丢失一个 UserWallet
对象来为其设置一个获取的地址并将其保存回数据库中。
如何获取钱包,然后调用 API 为它生成一个订阅地址?
getUnregisteredUserWallets()
returns Observable<UserWallet>
和 registerUserWallet()
returns Single<String>
.
强烈建议阅读并理解第一条评论中提到的依赖子流。
您可以通过将可观察序列更改为类似这样的内容来解决您的问题
userWalletDao.getUnregisteredUserWallets()
.flatMap(nextWallet -> registerUserWallet(nextWallet.getUserId()).toObservable()
.flatMap(address -> Observable.fromCallable(() -> new Pair<>(nextWallet, address)))) // return both wallet from previous mapping and address from current mapping to the next level
.flatMapCompletable(walletAddressPair -> Completable.fromAction(()->{
Wallet nextWallet = walletAddressPair.first;
String address = walletAddressPair.second;
nextWallet.setAddress(address);
userWalletDao.persistUserWalletAddress(nextWallet);
log.info("Registered wallet {} with address {}.", nextWallet, address);
// here wallet and address have been saved to db. This operation is a completable action, you don't have to return any result
// from it and forward to the next level. Thats why flatMapCompletable is used.
}))
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onComplete() {
// All actions completed
}
@Override
public void onError(Throwable e) {
// any error occurred in the observable chain
}
});
我正在调试我的代码,它从数据库中获取 UserWallet
s,然后通过连接外部 REST API 为它们生成地址。现在我有一个订阅嵌套在另一个订阅中,但我读到它是一个糟糕的解决方案(它实际上不起作用,我认为这就是原因)。
userWalletDao.getUnregisteredUserWallets()
.subscribe(nextWallet -> {
log.info("Fetched next wallet for registration {}", nextWallet);
blockchainIntegration.registerUserWallet(nextWallet.getUserId())
.subscribe(address -> {
nextWallet.setAddress(address);
userWalletDao.persistUserWalletAddress(nextWallet);
log.info("Registered wallet {} with address {}.", nextWallet, address);
});
});
我试图在一个订阅中完成它,但是如果我将钱包映射到地址,我会丢失一个 UserWallet
对象来为其设置一个获取的地址并将其保存回数据库中。
如何获取钱包,然后调用 API 为它生成一个订阅地址?
getUnregisteredUserWallets()
returns Observable<UserWallet>
和 registerUserWallet()
returns Single<String>
.
强烈建议阅读并理解第一条评论中提到的依赖子流。
您可以通过将可观察序列更改为类似这样的内容来解决您的问题
userWalletDao.getUnregisteredUserWallets()
.flatMap(nextWallet -> registerUserWallet(nextWallet.getUserId()).toObservable()
.flatMap(address -> Observable.fromCallable(() -> new Pair<>(nextWallet, address)))) // return both wallet from previous mapping and address from current mapping to the next level
.flatMapCompletable(walletAddressPair -> Completable.fromAction(()->{
Wallet nextWallet = walletAddressPair.first;
String address = walletAddressPair.second;
nextWallet.setAddress(address);
userWalletDao.persistUserWalletAddress(nextWallet);
log.info("Registered wallet {} with address {}.", nextWallet, address);
// here wallet and address have been saved to db. This operation is a completable action, you don't have to return any result
// from it and forward to the next level. Thats why flatMapCompletable is used.
}))
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onComplete() {
// All actions completed
}
@Override
public void onError(Throwable e) {
// any error occurred in the observable chain
}
});