用 RxJava 替换监听器

Replace listeners with RxJava

目前我正在研究向 RxJava 的迁移,并决定我的经理 (accountManager) 将是一个有趣的起点。目前,管理器有一个侦听器列表并相应地发送更新,无论是在帐户更新时还是出现问题时。

private List<WeakReference<ProfileChangeListener>> mListeners = new ArrayList<>();

public interface ProfileChangeListener {
    void onProfileUpdated(Account account);
    void onProfileFailed(Exception e);
}

我的 Rx 解决方案涉及 Subject

private SerializedSubject<Account, Account> mManagerSubject = new SerializedSubject<>(BehaviorSubject.<Account>create());

public Observable<Account> observe() {
    return mManagerSubject;
}

然后当发生更新时,我调用以下方法之一:

private void onProfileUpdated(Account account) {
    mManagerSubject.onNext(account);
}

private void onProfileFailed(final Exception e) {
    mManagerSubject.onError(e);
}

问题

问题是,一旦 onError 被调用,任何通过 observe 收听的人将永远不会从 onNext 获得另一个更新。

我仍然希望订阅者收到 onError 以便他们可以处理错误状态,但稍后 onNext 仍然可以使用更新后的帐户进行调用,我仍然希望订阅者处理更新后的帐户。

我尝试过使用 onErrorResumeNextonErrorReturn onExceptionResumeNext 的解决方案,但其中 none 会传播 onError.

TLDR:如何在调用 onError 后保持订阅者订阅,同时仍在传播 onError?

Rx 中的

"Errors" 一开始可能有点难以理解,因为它们的含义与大多数人的预期略有不同。

来自 Error Handling documentation(强调我的):

An Observable typically does not throw exceptions. Instead it notifies any observers that an unrecoverable error has occurred by terminating the Observable sequence with an onError notification.

onError() 应该在 Observable 遇到 不可恢复的 错误时使用 - 即当您的 Observable 无法继续发射物品时.当您订阅时,您可能会使用 onErrorResumeNext 之类的东西来尝试一些恢复操作,但这应该是源 Observable.

的结尾

相反,您可能需要调整 Observable 发出的内容以支持发出错误项,或包括一个标志以指示遇到错误。

如果您的错误确实无法恢复,那么您可能需要重新审视您的恢复策略并尝试稍微不同的方法。