使用 RxJava 处理长 运行 任务

Handling long running tasks with RxJava

我正在尝试迁移一个向服务器发送消息的 AsyncTask,以使用 RxJava。大致来说,该任务执行以下操作:

1) 创建将要发送的消息(保存到数据库)
2) 向用户显示消息(状态 'sending')
3) 将消息发送到服务器(下面的代码片段)
4) 将消息标记为已发送或失败(保存到数据库)
5) 更新 UI

我已经创建了所需的 Rx 链,部分如下所示:

 public Observable<Message> sendMessage(Message message) {
     return mApiClient.sendMessage(message)
         .doOnNext(sentMessage -> mDatabase.synchroniseMessage(sentMessage))
         .doOnError(e -> {
             message.setState(FAILED);
             mDatabase.synchroniseMessage(message));
         })
         .onErrorReturn(e -> Observable.just(message));

当我订阅上面的内容时,我得到了 Disposable。通常我会将它添加到 CompositeDisposable 对象和 clear 该对象,然后用户已移动到不同的 view(即 fragment)。但是,在这种情况下,我需要保留 运行 此任务以确保本地数据库相应地更新了任务结果。

处理这种情况最合适的方法是什么?我可以简单地不将 Disposable 添加到我的 CompositeDisposable 对象中,因此它不会被取消订阅,但感觉它可能会导致问题。

P.S。向用户显示更新是通过观察 SQLite table 中的数据来处理的。这些事件由 synchroniseMessage 方法触发。这是一个不同的订阅,我将直接取消订阅,所以这不是问题的一部分。

一个人一旦对它不再感兴趣就处理 Disposable

在您的情况下,无论用户是否导航到另一个屏幕,您仍然对流感兴趣,这意味着您无法取消订阅。这意味着您不能将其添加到 CompositeDisposable.

这会导致这样一种情况,当你的 Activity 不能被垃圾回收时,因为你的 Subscription 对它的隐式引用,因此你正在造成内存泄漏情况。

如果您有这样的用例,我认为您必须在组件上执行该请求,该组件将 activity 生命周期独立,例如 Service.