关于 Android 应用程序中的线程、CompositeDisposable 和调度程序的 RxJava 问题

RxJava Questions About Threads, CompositeDisposable and Scheduler in an Android Application

我的代码是这样的:

timingObservable = getCurrentModule()
                .zipWith(Observable.interval(200, TimeUnit.MILLISECONDS), (currentModule, interval) -> currentModule)
                .repeat()
                .distinctUntilChanged()
                .getModuleDataFromDb()

compositeDisposable.add(timingObservable
        .subscribeOn(Schedulers.io())
        .subscribe(next -> {
            .
            .
            .
        }));

public Observable<String> getCurrentModule() {
    return Observable.fromCallable(() -> {
        String currentModule = "";
        // doing some none database work and computation
        .
        .
        .
        return currentModule;
    }
}

应该定期检查当前模块,如果模块发生变化,从数据库中获取一些数据。我有几个问题:

回答问题:

In the RxThreadFactory class of RxJava and in the newThread() method we have the line t.setDaemon(true), so is it true that all RxJava threads are daemon threads? So, they are alive as long as a component of app is alive and the app process is still running, right?

在Java Thread::setDaemon(true) 中只是意味着一旦所有非守护线程都完成,这些“守护”线程将被放弃,JVM 将关闭。作为参考,android“主”线程不是守护线程,但有一个 Looper。守护线程可以自然地完成并且不会阻止进程退出。你不应该依赖这种机制来完成长时间的 运行 任务并使用前台服务 and/or WorkManager,Rx 线程池可以持续到它们在 运行 中的进程,除非他们绑定的 Executor 已明确关闭。

I am adding the disposable return of subscribe() to a compositeDisposable and call dispose in onDestory() of my Service/Activity classes. Lets say in a scenario there are one service and one activity and compositeDisposable belongs to the service. What happens to those disposables when the service gets killed without onDestroy() being called and activity remains alive. I mean, since the compositeDisposable object is destoryed, is it possible that I lose the ability to dispose disposables? Should I hold application wide instance of CompositeDisposable?

服务只会在没有生命周期回调的情况下被销毁,如果 1) Android 终止进程以回收资源,在这种情况下它与清除资源无关,或者 2) 程序崩溃,然后再次崩溃不需要资源清理

In terms of performance, which one is recommended in this code? subscribeOn(Schedulers.io()) or subscribeOn(Schedulers.computation()) since Observable.interval uses computation scheduler by default and we have DB work too.

Schedulers.io 是一个无界线程池,而 Schedulers.computation() 是有界的(我相信是一个包含 8 个线程的池)。性能可能会有所不同,但在大多数情况下差异可以忽略不计。一种情况 Schedulers.computation() 可能会更慢,如果您连续使用此线程池有很多并发性,这意味着您正在等待线程变为空闲。相比之下,Schedulers.io() 可能需要从它使用的 ThreadFactory 创建新线程的前期成本。但是它将尝试使用其池中的现有线程。需要指标才能真正看到各个用例的任何性能差异。根据一般规则,io() 建议像 file/database/networking 这样的工作应该使用 Schedulercomputation 像 timers/algorithms 这样的工作应该使用后者。

Any suggestions to improve the above code for periodic tasks?

正如已经建议的轮询,通常不是一个好主意,从概念上讲,Reactive Streams 作为 pub/sub 观察者模式更有用。但是,您似乎对可以修改的内容有一些限制。对于提供的代码,上下文太少,无法真正提供任何具体的改进,只有建议,最终将以“我不能那样做,因为......”结束。