如何在 WorkManager 中配置 Rx?

How to dispose Rx in WorkManager?

我实现了 AlarmManager 以在用户向任务添加截止日期时发送通知。然而,当用户关闭设备时,所有警报都将丢失。现在我正在更新 BroadcastReceiver 以接收 android.intent.action.BOOT_COMPLETED 并重新安排为每个任务设置的所有警报。

我的第一次尝试是在 BroadcastReceiver 中获取一个包含所有截止日期高于当前时间的任务的 Rx Single,然后重新安排所有警报。问题是一旦 BroadcastReceiver 没有生命周期,我就无法处理 Observable。另外,.

在我的研究中,IntentService 是这个案例的一个很好的解决方案,但我正在进入新的 WorkManager 库,OneTimeWorkRequest 看起来像一个简单的好方法解决方案。

正在调用 Worker 并正确执行,但我无法处理 Observable,因为从未调用 onStopped 方法。

下面是实现,基于:

class TaskAlarmWorker(context: Context, params: WorkerParameters) :
    Worker(context, params), KoinComponent {

    private val daoRepository: DaoRepository by inject()

    private val compositeDisposable = CompositeDisposable()

    override fun doWork(): Result {
        Timber.d("doWork")

        val result = LinkedBlockingQueue<Result>()

        val disposable =
            daoRepository.getTaskDao().getAllTasks().applySchedulers().subscribe(
            { result.put(Result.SUCCESS) },
            { result.put(Result.FAILURE) }
        )

        compositeDisposable.add(disposable)

        return try {
            result.take()
        } catch (e: InterruptedException) {
            Result.RETRY
        }
    }

    override fun onStopped(cancelled: Boolean) {
        Timber.d("onStopped")
        compositeDisposable.clear()
    }
}

你可以用onStoped()方法清除它然后compositeDisposable.dispose();

然后调用super.onStoped()

  • WorkManager 是一个很好的解决方案(甚至可能是最好的解决方案)
  • 您应该使用 RxWorker 而不是 Worker。这是一个例子:

    1. 去实现它。添加 androidx.work:work-rxjava2:$work_version 作为依赖项到您的 build.gradle 文件。

    2. RxWorker class 扩展您的 class,然后覆盖 createWork() 函数。

class TaskAlarmWorker(context: Context, params: WorkerParameters) :
    RxWorker(context, params), KoinComponent {

    private val daoRepository: DaoRepository by inject()  

    override fun createWork(): Single<Result> {
        Timber.d("doRxWork")

     return daoRepository.getTaskDao().getAllTasks()
                .doOnSuccess { /* process result somehow */ }
                .map { Result.success() }
                .onErrorReturn { Result.failure() }          

    }

}

Important notes about RxWorker:

  • The createWork() method is called on the main thread but returned single is subscribed on the background thread.
  • You don’t need to worry about disposing the Observer since RxWorker will dispose it automatically when the work stops.
  • Both returning Single with the value Result.failure() and single with an error will cause the worker to enter the failed state.
  • You can override onStopped function to do more.

阅读更多: