如何在创建的 Observable 中正确控制已定义调度程序的排放
How to correctly control emissions for defined Scheduler in created Observable
我正在使用 RxJava2,假设我有这个 Observable:
Observable
.create(emitter ->
SomeDependency.registerCallback(data -> emitter.onNext(data))
)
.subscribeOn(Schedulers.io());
它观察一些异步逻辑,然后发出从中得到的任何东西。重要的是要知道,在已注册的回调中,数据是在 SomeDependency
处理的线程上传递的。结果,这导致所有 emitter
向下游的排放都在该线程上传送,忽略定义的 Scheduler
.
在 中,@akarnokd 提供了一个提示,指示使用 Scheduler.Worker
将数据重定向到正确线程的方法。这种方法的修改示例如下所示:
Observable
.create(emitter -> {
final Worker worker = Schedulers.trampoline().createWorker();
emitter.setDisposable(worker);
SomeDependency.registerCallback(data ->
worker.schedule(() -> emitter.onNext(data))
)
})
.subscribeOn(Schedulers.io());
NOTE: trampoline()
creates a Scheduler
for a current thread. In our case the io()
thread as we have defined that one to create our Observable on.
事实是,在创建此类 Observable 时,您通常不仅需要 registerCallback()
,还需要 unregisterCallback()
。在常见的情况下,您将 unregisterCallback()
放入 emitter
的 Disposable
中。但是,如您所见,我们的 emitter
已经有一个 Disposable
,无法设置另一个。如果要设置第二个 Disposable
,则取消设置并释放前一个。
请问您对如何解决这个问题有什么想法吗?
我想你需要的是CompositeDisposable
。
A disposable container that can hold onto multiple other disposables.
我正在使用 RxJava2,假设我有这个 Observable:
Observable
.create(emitter ->
SomeDependency.registerCallback(data -> emitter.onNext(data))
)
.subscribeOn(Schedulers.io());
它观察一些异步逻辑,然后发出从中得到的任何东西。重要的是要知道,在已注册的回调中,数据是在 SomeDependency
处理的线程上传递的。结果,这导致所有 emitter
向下游的排放都在该线程上传送,忽略定义的 Scheduler
.
在 Scheduler.Worker
将数据重定向到正确线程的方法。这种方法的修改示例如下所示:
Observable
.create(emitter -> {
final Worker worker = Schedulers.trampoline().createWorker();
emitter.setDisposable(worker);
SomeDependency.registerCallback(data ->
worker.schedule(() -> emitter.onNext(data))
)
})
.subscribeOn(Schedulers.io());
NOTE:
trampoline()
creates aScheduler
for a current thread. In our case theio()
thread as we have defined that one to create our Observable on.
事实是,在创建此类 Observable 时,您通常不仅需要 registerCallback()
,还需要 unregisterCallback()
。在常见的情况下,您将 unregisterCallback()
放入 emitter
的 Disposable
中。但是,如您所见,我们的 emitter
已经有一个 Disposable
,无法设置另一个。如果要设置第二个 Disposable
,则取消设置并释放前一个。
请问您对如何解决这个问题有什么想法吗?
我想你需要的是CompositeDisposable
。
A disposable container that can hold onto multiple other disposables.