Observable.create() 在 RxJava 2 中的正确使用(最佳实践)

Proper usage of Observable.create() in RxJava 2 (Best Practices)

我目前正在构建一个利用 RxJava 2Firebase 的小型社交媒体风格应用程序。我正在使用 MVP 风格的架构,并且我已经使用一个名为 AuthSource 的接口抽象出我的 AuthService。

为简单起见,我将在我的服务中使用单一方法:

public class FirebaseAuthService implements AuthSource {

private FirebaseAuth auth;
private FirebaseAuth.AuthStateListener listener;

//initialization code

@Override
public Maybe<User> getUser() {
    return Maybe.create(new MaybeOnSubscribe<User>() {
                            @Override
                            public void subscribe(final MaybeEmitter<User> e) throws Exception {
                                if (auth == null) {
                                    auth = FirebaseAuth.getInstance();
                                }

                                if (listener != null) {
                                    auth.removeAuthStateListener(listener);
                                }

                                listener = new FirebaseAuth.AuthStateListener() {
                                    @Override
                                    public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
                                        FirebaseUser firebaseUser = firebaseAuth.getCurrentUser();
                                        auth.removeAuthStateListener(listener);
                                        if (firebaseUser != null) {
                                            User user = new User(
                                                    firebaseUser.getDisplayName(),
                                                    firebaseUser.getEmail());

                                            user.setUserId(firebaseUser.getUid());


                                            Uri photoUrl = firebaseUser.getPhotoUrl();
                                            if (photoUrl != null){
                                                user.setProfilePhotoUrl(photoUrl.toString());
                                            }
                                            e.onSuccess(user);
                                        } else {
                                            e.onComplete();
                                        }
                                    }
                                };

                                auth.addAuthStateListener(listener);
                            }
                        }
    );

}

}

interface AuthSource {
    Maybe<User> getUser();
//Other methods etc.
}

最后,我将展示处理调用的 Presenter 方法:

//from with a Presenter:
@Override
private void getUserData() {
    disposableSubscriptions.add(
            auth.getUser().subscribeOn(schedulerProvider.io())
                    .observeOn(schedulerProvider.ui())
                    .subscribeWith(
                            new DisposableMaybeObserver<User>() {

                                @Override
                                public void onError(Throwable e) {
                                    view.makeToast(R.string.error_retrieving_data);
                                    view.startDispatchActivity();
                                }

                                @Override
                                public void onComplete() {

                                }

                                @Override
                                public void onSuccess(User user) {
                                    ProfilePagePresenter.this.currentUser = user;
                                    view.setName(user.getName());
                                    view.setEmail(user.getEmail());
                                    if (user.getProfilePhotoUrl().equals("")) {
                                        view.setDefaultProfilePhoto();
                                    } else {
                                        view.setProfilePhotoURI(user.getProfilePhotoUrl());
                                    }

                                    getUserProfileFromDatabase();

                                }
                            }
                    )
    );
}

我意识到问题的主题有点笼统,所以我会尝试从这里缩小范围。在我使用 Create() 从 Firebase API 成功获取数据的情况下,我在上面发布的代码 有效 。问题是,我对使用 RxJava 2 还很陌生,我不确定垃圾收集和内存泄漏的幕后情况。根据 RxJava 2 文档,我选择使用 Observable.create():

"Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world."

RxJava 2 Docs 最后,我目前为处理这些 Observable 所做的唯一积极主动的事情是,当事件将用户带到新的 Activity 时,在我的 Presenter 中调用 CompositeDisposable.clear()。


问题:

- 可以安全地假设当 Presenter 完成时简单地调用 CompositeDisposable.clear() 将处理我的垃圾收集吗? (假设我没有在其余代码中造成内存泄漏)。

-如果我的理解是正确的,在这种情况下,create() 是比 fromCallable() 更好的选择,因为 fromCallable() 应该用于同步事件(即不像 Firebase API回调)?

-它真的像在 Observable.create() 中抛出我的异步回调一样简单吗?我很害怕这样做是多么容易...

  • 调用 clear 将分离订阅者 - 对发出的事件做出反应的代码,来自 Observable,结果订阅者被 presenter/activity 包围并具有硬引用对它来说,将不再被观察者持有并且比 presenter/activity 生命周期更长。
    但是,请注意,如果您的 Observable 本身包含对您的 presenter/activity.
    的引用,您仍然可能导致泄漏 在任何一种情况下,当您通过 static 或其他存在于比您的 activity/presenter 更长的上下文(例如应用程序)上下文中的对象引用您的 activity/presenter 时,都会发生泄漏。 =11=]

  • 确实,create() 方法是从异步方法创建 Observable 的正确方法(顺便说一句,在 RxJava1 中,有一种不同的过时方法也称为 create,但在 RxJava2 中已更改,所以不会有错误地创建 Observable 的方法,但那是另一回事)

  • 嗯,你还是要服从Observable contract, 确保会有终端事件(onComplete/onError), 在终端事件(onCompleted/onError)和背压(由 Flowable Observable 强制执行)

  • 之后将不会有 onNext

Is it safe to assume that simply calling CompositeDisposable.clear() when the Presenter finishes, will handle my Garbage collection? (assuming I haven't created memory leaks in the rest of the code).

比这更棘手一点。如果 Observable 引用的所有内容都属于 Activity 范围,Non-disposed Observable 不会造成内存泄漏。生产者和消费者都将与 Activity 一起被垃圾收集。如果您引用的资源将在 Activity 之后继续存在,例如在 Application 级别实例化的提供程序,则可能会发生内存泄漏。因此,如果您想使用 CompositeDisposable.clear(),请确保在 Observable.create() 中实现 emitter.setCancellable() 以处理那些泄漏的资源。

If my understanding is correct, create() is a better option to use than fromCallable() in this case, as fromCallable() should be used for Synchronous events (i.e. not something like Firebase API callbacks)?

create() 用于命名 fromAsync()。使用 fromCallable() 包装同步方法调用,包装回调代码时使用 create()

Is it really as simple as just throwing my Asynchronous callbacks in Observable.create()? I'm terrified at how easy that is to do...

这很容易...如果您像第一点提到的那样处理范围之外的那些讨厌的引用。

通常在 Android 上,内存泄漏涉及 Context,这是很大的。请务必测试您的代码。 leakcanary对此事有很大的帮助

最后,您可以通过使用现有的 Firebase RxJava 绑定避免自己进行包装。或从中汲取灵感: