当其中之一出错时终止 Observable 链

Terminate Observable chain when one of them errors

我有一系列 Rx Completables,我想一个接一个地 运行。我正在使用 concat() 来执行此操作,因为我不希望它们全部同时启动。

view.welcome_message_edittext.verifyNotEmpty(getString(R.string.enter_your_email_address))
    .concatWith(view.welcome_message_edittext.verifyEmailAddress())
    .concatWith(sendMessageToBot())
    .subscribe({
        // The user has successfully entered data into the edittext, entered an email into the edittext, and sent message to bot. 
    }, { error -> })

上面的代码是这样说的,"Assert the user has entered text into the EditText. If that is true, assert the user has entered an email into the EditText. If both of those are true, send a message to the bot." 如果用户在 EditText 中输入文本但它不是电子邮件,我预计 Completables 链会中断并调用 onError()。

这就是我想要发生的事情^^^。当 Completables 的 any 调用 onError() 时(如果用户将 EditText 留空或不输入电子邮件地址,verifyNotEmpty()verifyEmailAddress() 会这样做)那么我期望整个链终止并调用 .subscribe() onError() 函数。

但是,查看 .concat() 的文档,这是它的实际行为:

当调用 onError 时,

concat() 将简单地移动到下一个 Completable。链条继续。

所以我的问题是,当任何 Completables 调用 onError() 时,我需要使用什么来打破链条?

感谢@Buckstabue 在评论中帮助我调试此问题。他的评论:

Let me guess. It's absolutely normal that the method verifyEmailAddress() is called and I suspect you are doing some business logic right there outside of an observable. You can put that logic inside the observable and it will be calculated lazily It's similar to difference between Observable.just(getMyInteger()) and Observable.fromCallable(() -> getMyInteger()). In the second case getMyInteger() will be lazily called after subscribing while the first one is called immediately

回到我的代码并查看我的 verifyEmailAddress()sendMessageToBot() 函数:

private fun sendMessageToBot(): Completable {
    insertChatMessageIntoConversation(ChatMessage(view!!. welcome_message_edittext.text.toString()))
    return Completable.complete()
}

fun EditText.verifyEmailAddress(): Completable {
    if (!android.util.Patterns.EMAIL_ADDRESS.matcher(text.trim()).matches()) {
        return Completable.error(RuntimeException("Enter a valid email"))
    } else {
        return Completable.complete()
    }
}

函数的逻辑不在 Completable 块的 内部。当我编写代码时,我认为这并不重要,因为我认为 Rx 的行为是它执行每个 Completable 并等待它们完成或完全错误,然后再移动到下一个 Completeable。因此,完全跳过 sendMessageToBot()verifyEmailAddress() 函数。不是这样的。

这个有效:

fun EditText.verifyEmailAddress(): Completable {
    return Completable.fromCallable({
        if (!android.util.Patterns.EMAIL_ADDRESS.matcher(text.trim()).matches()) {
            val errorMessage = context.getString(R.string.enter_email_address)
            error = errorMessage
            throw RuntimeException(errorMessage)
        }
    })
}

private fun sendMessageToBot(): Completable {
    return Completable.fromCallable {
        insertChatMessageIntoConversation(sage(view!!. welcome_message_edittext.text.toString()))
    }
}