如何在 RxJava 中将单个的输出传递给可完成的?

How to pass output of a single to a completable in RxJava?

我想调用 API api1 并将 API 编辑的输出 return 传递给第二个 API api2 .第一个 API 是一个获取请求,第二个 API 是一个 POST 请求。因此,api1return是Single<String>api2return是Completable

函数看起来像这样:

// api1
public Single<String> getToken() {
  ...
}
// api2
public Completable saveTokenToBackend(String token, String userId) {
 ...
}

我想将这两个操作链接在一起。订阅者只对获取令牌和保存令牌的过程是否成功感兴趣。因此,最终操作链的 return 类型应该是 Completable。但是,当我这样做时,API 要求 api2 停止发生。根据日志,只有 api1 成功运行。

Single<Completable> r1 = getToken().map(t -> saveTokenToBackend(t, userId));
Completable r2 = Completable.fromSingle(r1);

我这里更广泛的问题是,我如何将响应从 Single 链接到 Completable?

第二个问题是为什么上面的代码不起作用?

::编辑::

根据评论中的建议,我尝试了:

public Completable getAndSaveToken() {
    getToken().flatMapCompletable(t -> saveTokenToBackend(t, "dummyuser");
}

在我的应用程序代码中,我正在做:

getAndSaveToken()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(() -> {
                Log.v(TAG, "call success");
    }, error -> {
                Log.e(TAG, "Error", error);
    });

结果:java.io.IOException: Must not be called on the main application thread

你有 2 个不同的问题,构图和调度。上下文:getToken return 一个,saveToken(token) return 一个完整的。

composition: 组合一个单一的和一个可完成的,正如你已经注意到的,你可以使用 flatMap 运算符,这个 returns 一个新的可完成的,首先得到令牌,然后保存它(可能,您需要在保存之前修改令牌 ;)

getToken().flatMapCompletable(n -> saveToken(n)) // returns a completable

如果你想把它作为一个单一的,那么你可以将它映射回第一个实例:

getToken().flatMap(n -> saveToken(n).toSingleDefault(n)) // returns a single

调度: 在android中,不能在主线程中发起请求。为避免它,您可以使用 subscribeOn 运算符,看起来您已经注意到

getToken().subscribeOn(io()).doOnNext(n -> {/*this should be evaluated in io thread*/})
getToken().subscribeOn(io()).observeOn(mainThread()).doOnNext(n -> {/*this on mainThread*/})

如果您仍然遇到错误,那么 observeOnsubscribeOn 已在某处重新配置。需要更多代码才能确定。但是无论如何,您可以断言这两个请求都在应用运算符的 io 线程中执行了两次:

getToken().subscribeOn(io).flatMapCompletable(token -> saveToken(token).subscribeOn(io()))

另一种选择,如果您正在使用改造,则使用 createWithScheduler(io()) 工厂而不是默认工厂来应用 RxJava2CallAdapterFactory。这将断言所有请求都在 io() 中创建,因此您可以组合和准备数据,最后应用 observeOn(mainThread()) 更新 UI.