如何在创建的 Observable 中正确控制已定义调度程序的排放

How to correctly control emissions for defined Scheduler in created Observable

我正在使用 RxJava2,假设我有这个 Observable:

Observable
   .create(emitter -> 
      SomeDependency.registerCallback(data -> emitter.onNext(data))
   )
   .subscribeOn(Schedulers.io());

它观察一些异步逻辑,然后发出从中得到的任何东西。重要的是要知道,在已注册的回调中,数据是在 SomeDependency 处理的线程上传递的。结果,这导致所有 emitter 向下游的排放都在该线程上传送,忽略定义的 Scheduler.

中,@akarnokd 提供了一个提示,指示使用 Scheduler.Worker 将数据重定向到正确线程的方法。这种方法的修改示例如下所示:

Observable
   .create(emitter -> {
      final Worker worker = Schedulers.trampoline().createWorker();
      emitter.setDisposable(worker);
      SomeDependency.registerCallback(data -> 
         worker.schedule(() -> emitter.onNext(data))
      )
   })
   .subscribeOn(Schedulers.io());

NOTE: trampoline() creates a Scheduler for a current thread. In our case the io() thread as we have defined that one to create our Observable on.

事实是,在创建此类 Observable 时,您通常不仅需要 registerCallback(),还需要 unregisterCallback()。在常见的情况下,您将 unregisterCallback() 放入 emitterDisposable 中。但是,如您所见,我们的 emitter 已经有一个 Disposable,无法设置另一个。如果要设置第二个 Disposable,则取消设置并释放前一个。

请问您对如何解决这个问题有什么想法吗?

我想你需要的是CompositeDisposable

A disposable container that can hold onto multiple other disposables.