带 Kotlin 协同程序的房间观察数据库变化

Room with Kotlin-coroutines observe db changes

所以,我最近开始尝试协程,我从 Rxjava2 切换到协程,我还没有掌握它,但我 运行 进入了需要观察我的数据库的状态更改并更新对应的 UI。

RxJava 曾经为我提供 Flowables、Completeable 等。使用它我将能够观察到 Db 的变化。

    abstract fun insert(data: SomeData): Long

    @Query("SELECT * FROM somedata_table")
    abstract fun getData(): Flowable<List<SomeData>>

所以现在在这里我曾经订阅 getData 并且总是用来观察变化

现在进入协程,我正在使用一个暂停的函数,它的结果延迟到 return 我的回复

    @Insert(onConflict = OnConflictStrategy.IGNORE)
    abstract fun insert(data: SomeData): Long

    @Query("SELECT * FROM somedata_table")
    abstract fun getData(): List<SomeData>
suspend fun getAllSomeData():Deferred<List<SomeData>>{
        return GlobalScope.async (context= coroutineContext){
            database.myDao().getData()
        }
    }

现在我无法监听更新,协程中的 Channels 可能是正确的答案?但我不确定如何将它与 Room 一起使用。

Gradle 依赖关系:

dependencies {
    compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-reactive', version: '1.1.1'
}

房道

@Dao
interface HistoryDao : BaseDao<HistoryEntity> {

    @Query("select * from History order by time desc")
    fun observe(): Flowable<List<HistoryEntity>>

    ...
}

Interactor(下面browserHistoryInteractor(dao和Fragment/Presenter之间的层)

// To get channel of List<HistoryEntity>:
import kotlinx.coroutines.reactive.openSubscription

fun observe() = historyDao.observe().openSubscription() // convert list to Coroutines channel

Presenter/Fragment/Activity(终点(在我的例子中是生命周期感知的演示者))

import kotlinx.coroutines.Job
import kotlinx.coroutines.launch

private val compositeJob = Job() // somewhat equivalent "compositeDisposable" in rx

override fun onCreate() {
    super.onCreate()

    launch(compositeJob) { // start coroutine
        val channel = browserHistoryInteractor.observe() 
        for (items in channel) {  // waits for next list of items (suspended)
            showInView { view?.setItems(items) }
        }
    }
}

override fun onDestroy() {
    compositeJob.cancel() // as in rx you need to cancel all jobs
    super.onDestroy()
}

https://www.youtube.com/watch?v=lh2Vqt4DpHU&list=PLdb5m83JnoaBqMWF-qqhZY_01SNEhG5Qs&index=5 在 29:25

目前,有两种不同的方法可以做到这一点。第一种是使用 liveData 构建器函数。要使其正常工作,您需要将生命周期更新为 androidx.lifecycle:*:2.2.0-alpha01 或任何更新版本。 LiveData 构建器函数将用于异步调用 getData(),然后使用 emit() 发出结果。使用此方法,您会将 Room getData() 函数修改为挂起函数,并将 return 类型包装为 LiveData,替换之前使用的 Flowable。

@Query("SELECT * FROM somedata_table")
abstract suspend fun getData(): LiveData<List<SomeData>>

在您的视图模型中,您创建了一个引用您的 Room 数据库的 liveData

val someData: LiveData<SomeData> = liveData {
    val data = database.myDao().getData() 
    emit(data)
}

第二种方法是从我们的数据库中获取数据作为流。要使用它,您需要将 Room 更新到 androidx.room:room-*:2.2.0-alpha02(目前是最新的)或更新的版本。此更新使 @Query DAO 方法成为 return 类型的 Flow 如果查询中的观察表无效,returned Flow 将重新发出一组新值。使用 Channel return 类型声明 DAO 函数是错误的

@Query("SELECT * FROM somedata_table")
abstract fun getData(): Flow<List<SomeData>?>

return 类型是可空列表的流。该列表可以为 null,因为当查询没有获取数据时 Room 将 return null。

为了从流中获取数据,我们将在 Presenter/ViewModel 中使用终端运算符 collect{ }。最好在 ViewModel 中执行此操作,因为它带有 ViewModelScope。下面给出的解决方案假设我们在 ViewModel 中执行此操作,我们提供了 viewModelScope。

    fun loadData(){
        viewModelScope.launch {
            database.myDao()
               .getData()
               .distinctUntilChanged().
               .collect{
                        it?.let{ /** Update your obsevable data here **/
               }
    }

使用 Room 2.2.0 流程kotlin 协程。这是有争议的,但我不喜欢 LiveData,因为它会在 UI 线程上为您提供结果。如果您必须进行任何数据解析,则必须将所有内容推回到另一个 IO 线程。它也比直接使用 channels 更干净,因为每次您想收听时都必须进行额外的 openSubscription().consumeEach { .. } 调用事件。

流方法需要以下版本:

// 这个版本在他们的非实验版本中使用协程和流程

org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2
androidx.room:room-runtime:2.2.0
androidx.room:room-compiler:2.2.0

道:

@Dao
interface MyDao {
     @Query("SELECT * FROM somedata_table")
     fun getData(): Flow<List<SomeData>>
}

class做观察:

launch {
   dao.getData().collect { data ->
    //handle data here
   }
}

如果您的调用 class 本身不是 CoroutineScope,则您必须在上下文中调用 launch。这可以是 GlobalScope 或您创建的其他 class。假设我们在 Activity class.

中,我在这里使用 lifecycleScope
lifecycleScope.launch {
   dao.getData().collect { data ->
    //handle data here
   }
}

collect lambda 将接收到 table 的每个更新,就像 Rx onNext 调用一样。