RxJava 和 long 运行 监听器

RxJava and long running listener

好的,所以我是 RxJava2 的新手(好吧,我也不知道 RxJava)并且正在尝试使用 RxJava2 和 MVP 结构开发一个 Android 应用程序。

在那个应用程序中,我正在对使用侦听器的库进行异步调用。 我使用 "standard" setListener / registerListener 方法设置监听器。

其中一个方法是返回值 "realtime" -> 我调用了我的库的 start() 方法,然后在列表的每次修改时都会在我的侦听器上收到通知(当有 add/remove 个项目)。

我真的不明白如何使用 RxJava 实现这种行为,因为在发射器/订阅者的定义中订阅了监听器? 我应该在哪里声明听众?我应该在哪里退订?我应该使用什么对象?

我使用 Nucleus 开始开发,但可以切换到另一个样板或自己做一个。

下面是一些伪代码来说明我的问题:

之前

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    ...

    mMyLib.setListener(this);
    mMyLib.startDiscovery();
}

@Override
public void itemListChanged(List<Dummy> items) {
    // update the UI with the list items
}

@Override
protected void onDestroy() {
    super.onDestroy();
    mMyLib.setListener(null);
}

在我的演示器中使用 Nucleus

如果我想在我的 activity / 主持人还活着的情况下接收列表的修改,我应该在哪里退订?我是否使用了正确的语法/对象?

private static final int REQUEST_ITEMS = 1;
private PublishSubject<Integer> pageRequests = PublishSubject.create();

...

@Override
public void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableReplay(REQUEST_ITEMS,
            () -> Observable.create(e ->
                    {
                        mMyLib.setListener(new MyLib.Listener() {
                            @Override
                            public void itemListChanged(List<Dummy> items) {
                                Log.d(TAG, "meh itemListChanged");
                                e.onNext(items);
                                e.onComplete();
                            }
                        });
                        mMyLib.startDiscovery();
                    }
            )
            ,
            MainFragment::onItems,
            MainFragment::onNetworkError);
}

void request() {
    start(REQUEST_ITEMS);
}

你的方向是正确的,这是用 RxJava 包装异步回调的有效方法,一些评论:

  • 你在 itemListChanged 上调用 e.onComplete(),这是错误的,因为它会结束你的 Observable 序列,你可能根本不需要调用 onComplete() , 因为它永远不会结束来自外部源的通知,或者在通知真正结束时调用它一次(外部源完成生产项目),不要将它与取消订阅源混淆。
  • 对于要执行某些操作的取消订阅逻辑,您应该在 create() 中定义它,使用您的取消逻辑(mMyLib.setListener(null) 或任何其他清理资源代码)调用 e.setCancellable()
  • 在这种情况下,您似乎只有 1 个订阅者,但考虑使用 share(),以获得可以向多个订阅者进行多播的 'hot' Observable
  • 至于 Nucleus 库,据我所知 restartableReplay,将缓存你的 Observable 重放发出的项目 ,这可能是这种 'hot' 事件流是错误的(除非你需要重放上次发射或其​​他东西),此外,缓存会出现问题,因为你将失去取消订阅的能力,所以一定要在这里使用 Nucleus。

Where should I unsubscribe if I want to receive modifications of the list as long as my activity / presenter are alive? Am I even using the right syntax/ objects?

您只需在您想停止接收通知的任何地方取消订阅,是在 onStop() 还是 onDestory() 取决于您。