Room 在 Android 上是否在后台线程上调用了订阅者的 onComplete 方法?

Is the onComplete method of a Subscriber called on a background thread by Room on Android?

我正在使用 Room 实现一个 Android 应用程序。目前,我已经将 Guave 和 Jetpack 库包含到我的其他部分的项目中。但是,如果这有助于解决我的问题,我不介意包含另一个库(即 JxJava)。但是,我没有找到任何权威文档来解决我的问题。

我需要对数据执行异步、可观察的查询,并且观察处理程序必须 运行 在后台线程上,因为我必须对结果进行一些代价高昂的 post 处理.

对于 Java,有两种与房间进行异步交互的选项:使用 Rxjava 或 Guava/LiveData(参见 Write asynchronous DAO queries - Language and framework options

如果我想要一个异步的一次性(与可观察的相反)查询并采用Guava/LiveData,那么API returns ListenableFuture(参见 Write asynchronous DAO queries - Write asynchronous one-shot queries). The ListenableFuture takes a Runnable as listener which is associated to an Executator which is used to dispatch the listener when the data changes (see ListenableFuture#addListener)。这很好,因为我的应用程序中已经有一个用于后台任务的中央 ThreadPool 执行程序。但是,这是我不想要的一次性查询。

异步、可观察 查询与 Guava/LiveData、returns 和 LiveData(参见 Write asynchronous DAO queries - Write asynchronous observable queries。这是一个可惜因为LiveDataObserveronChange方法总是在主线程(GUI)上执行。这是LiveData的设计原则,因为他们应该更新 UI,但这不是我需要的。当然,我可以在主线程上使用 onChange 方法,在我的后台执行程序上调度 Runnable并再次跳入后台,但这似乎涉及后台和主线程之间不必要的上下文切换。

所以我在考虑使用RxJava。异步、可观察 查询与 RxJava、returns 和 Flowable 相结合。可以使用 Consumer 订阅 Flowable 。但是,当 Flowable 发出一个新值时,我没有找到任何关于调度 Consumeraccept 方法的线程的任何信息。根据Flowable的创建者负责的RxJava文档,因为RxJava只定义了抽象接口,并没有规定具体的实现。在手头的案例中,Flowable 的创建者是 Room 库,但似乎没有记录,Room 为 Flowable.

使用了哪个线程

Room 是否使用主线程来更新其 Flowable? (那将是糟糕的,并且与 LiveData 相比没有任何改进)。 Room 是否使用与数据库查询相同的后台线程来更新其 Flowable? (那不会完全糟糕,但仍然可以改进。)或者 Room 是否派生了一个仅用于更新其 Flowable 的新线程? (好)

奖励:为了保持分叉和销毁线程的数量较少,如果 Room 可以在整个应用程序范围内使用我的 ThreadPoolExecutor,我将不胜感激,不仅用于 Flowables,而且用于它的查询和所有其他异步任务。

文档中找不到的东西,总能在源代码中找到code:D

假设这样的查询:

@Query("SELECT * FROM table")
fun query(): Flowable<List<Entity>>

Room 的注释处理器将生成一些代码,其中包含:

 return RxRoom.createFlowable(__db, true, new String[]{"table"}, new Callable<List<Entity>>() {... /*not important here*/}

检查此方法的实现:

Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
    final Maybe<T> maybe = Maybe.fromCallable(callable);
    return createFlowable(database, tableNames)
            .subscribeOn(scheduler)
            .unsubscribeOn(scheduler)
            .observeOn(scheduler)
            .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                @Override
                public MaybeSource<T> apply(Object o) throws Exception {
                    return maybe;
                }
            });

所以他们使用了一个从执行器创建的调度器。 你可以在这里找到关于这个执行者的一些信息 androidx.room.RoomDatabase.Builder#setQueryExecutor:

     * When both the query executor and transaction executor are unset, then a default
     * {@code Executor} will be used. The default {@code Executor} allocates and shares threads
     * amongst Architecture Components libraries. If the query executor is unset but a
     * transaction executor was set, then the same {@code Executor} will be used for queries.
     * <p>
     * For best performance the given {@code Executor} should be bounded (max number of threads
     * is limited).

例子

如下代码

    dao.query().subscribe({
        Log.d("CheckThread1", "onSuccess ${Thread.currentThread().name}")
    }, {
        Log.d("CheckThread1", "onError  ${Thread.currentThread().name}")
    })

    

将导致:

D/CheckThread1: onSuccess arch_disk_io_0

如您所见,此操作未在主线程上处理。

是的,您可以在构建数据库时设置自己的执行程序:

Room.databaseBuilder(
                context.applicationContext,
                Database::class.java,
                "main.db"
            )
                .setTransactionExecutor(.../*your executor*/)
                .setQueryExecutor(.../*your executor*/)
                .build()