在 运行 另一个 observable 之前使用 concatMap 运行 Single

Using concatMap to run a Single before running another observable

Android Studio 3.1 RC 2
kotlin 1.2.30

Java

中fetchMessage的签名
Single<Response> fetchMessage(final String Id); 

kotlin 代码

fun translate(Id: String): Completable {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
                .toCompletable()
    }

我想在fetchMessage

之前运行的方法
 fun insertMessage(Id: String): Completable {
        return Completable.fromCallable {
            insert(Id, State.IDLE)
        }
    }

我希望 insertMessage e 以某种方式 运行 在 fetchMessage 之前。我正在考虑使用 concatMap 但不确定如何组合翻译和 insertMessage。这样 insertMessage 将首先 运行 然后一旦完成,翻译将 运行.

非常感谢您的任何建议,

Update solution 1 using startWith(..):

通过将翻译方法的 return 更改为 Single。我是这样做的:

fun translate(Id: String): Single<State> {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
    }

然后我可以有一个方法来执行以下 insertMessage(..) -> translate(..):

translate(Id).toCompletable().startWith(insertMessage(id, State.IDLE))

这是一个理想的解决方案吗?

Update solution 2 using concatWith(..):

我的return在链中调用一个 Observable 并调用 toObservable()。

fun translate(Id: String): Observable<State> {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
                .toObservable()
    }

而且我可以使用 concatWith,因此序列为 insertMessage(..) -> translate(..):

translate(Id).toCompletable().concatWith(insertMessage(id, State.IDLE).toObservable())
    .toCompletable()

这些是正确的解决方案吗?

如果你有一个 Completable,你可以通过 andThen:

链接任何其他反应类型
insertMessage("id")
.andThen(translate("id"))

你的两个选择都有意义,但我建议你稍微清理一下。

首先,您需要清楚地了解在每种情况下使用什么 return 类型:Observable, Single or Completable.

定义如下:

  • Single 表示发出单个值或错误的 Observable。
  • Completable 表示 Observable 不发出任何值,但只发出终端事件,onError 或 onCompleted。

在这两种情况下,您都不需要任何数据 returned,您只需要知道操作是否成功。 Completable 正是为处理这种情况而设计的。

所以我建议您:

fun translate(Id: String): Completable {
    return repository.fetchMessage(Id)
             .flatMapCompletable {
                Completable.fromAction {
                  update(messageId, it, State.COMPLETED)
                }
             }.doOnError {
                update(Id, null, State.ERROR)
             }
}

fun insertMessage(Id: String): Completable {
    return Completable.fromCallable {
        insert(Id, State.IDLE)
    }
}

使用 Completable.fromAction instead of Completable.fromCallable 是让您的代码更简洁的好选择,因此您不需要 return 任何东西。

然后您可以使用您的任何选项,startWith or concatWith。两者都等到第一个 observable 在 运行 第二个 observable 之前完成。我更喜欢使用 concatWith,因为它按照编写函数的相同顺序运行函数。

最终我们得到了一个优雅的解决方案:

insertMessage(id).concatWith(translate(id))

translate(id).startWith(insertMessage(id))

关于 concat 的更多信息:http://reactivex.io/documentation/operators/concat.html

如果你好奇的话,这里是 rxJava 库中函数的实现:

public final Completable startWith(Completable other) {
    requireNonNull(other);
    return concat(other, this);
}

public final Completable concatWith(Completable other) {
    requireNonNull(other);
    return concat(this, other);
}

如您所见,唯一的区别是顺序。