Android 房间中的事务 w/ RxJava2
Transactions in Android Room w/ RxJava2
我的应用程序的一个要求是允许用户完成多个步骤,然后在完成后根据每个步骤中的条目将值写入数据库。 UI 中的每个步骤都可能有助于将需要写入数据库的操作。数据可能位于多个表中,并且属于这些表中的不同行。如果任何数据库操作失败,那么整个操作应该失败。
我最初考虑将所有数据加载到内存中,对其进行操作,然后简单地在每个可能的实体中调用更新方法(使用 REPLACE 的冲突策略),但可能会有非常大的数量内存中的数据。
我想我可以 assemble 一个列表,其中显示的每个 Fragment 贡献一个或多个 Completables,然后在结束时使用 Completable.concat() 顺序执行这些UI 流程。它看起来像下面这样:
Completable one = Completable.fromAction(() -> Log.w(LOG_TAG, "(1)")).delay(1, TimeUnit.SECONDS);
Completable two = Completable.fromAction(() -> Log.w(LOG_TAG, "(2)")).delay(2, TimeUnit.SECONDS);
Completable three = Completable.fromAction(() -> Log.w(LOG_TAG, "(3)")).delay(3, TimeUnit.SECONDS);
Completable four = Completable.fromAction(() -> Log.w(LOG_TAG, "(4)")).delay(3, TimeUnit.SECONDS);
Completable.concatArray(one, two, three, four)
.doOnSubscribe(__ -> {
mRoomDatabase.beginTransaction();
})
.doOnComplete(() -> {
mRoomDatabase.setTransactionSuccessful();
})
.doFinally(() -> {
mRoomDatabase.endTransaction();
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe();
Completables 实际上是 Room DAO insert/update/delete 方法的包装器。我可能还会在完成后执行 UI 操作,这就是我在主线程上观察的原因。
当我执行这段代码时,我得到了这些日志:
W/MyPresenter: Begin transaction.
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.
W/MyPresenter: End transaction.
W/System.err: java.lang.IllegalStateException: Cannot perform this operation because there is no current transaction.
W/System.err: at android.database.sqlite.SQLiteSession.throwIfNoTransaction(SQLiteSession.java:915)
W/System.err: at android.database.sqlite.SQLiteSession.endTransaction(SQLiteSession.java:398)
W/System.err: at android.database.sqlite.SQLiteDatabase.endTransaction(SQLiteDatabase.java:524)
W/System.err: at android.arch.persistence.db.framework.FrameworkSQLiteDatabase.endTransaction(FrameworkSQLiteDatabase.java:88)
W/System.err: at android.arch.persistence.room.RoomDatabase.endTransaction(RoomDatabase.java:220)
W/System.err: at ...lambda$doTest$MyPresenter(MyPresenter.java:490)
为什么在我到达 doFinally 时交易已经消失?我也欢迎任何关于这种方法的质量或可行性的评论,因为我对 RxJava 和 Room 还很陌生。
通过记录当前线程并仔细阅读 Android 开发人员 documentation 我想我可能终于明白我做错了什么了。
1) 事务必须发生在同一个线程上。这就是为什么它告诉我没有交易;我显然在线程之间跳动。
2) doOnSubscribe、doOnComplete 和 doFinally 方法是 side effects 因此不是实际流本身的一部分。这意味着它们不会出现在我 subscribe 上的调度程序上。它们将发生在我 observe 上的调度程序上。
3) 因为我想在完成后在UI线程上接收结果,但是希望副作用发生在后台线程上,所以我需要改变我观察的位置。
Completable.concatArray(one, two, three, four)
.observeOn(Schedulers.single()) // OFF UI THREAD
.doOnSubscribe(__ -> {
Log.w(LOG_TAG, "Begin transaction. " + Thread.currentThread().toString());
mRoomDatabase.beginTransaction();
})
.doOnComplete(() -> {
Log.w(LOG_TAG, "Set transaction successful." + Thread.currentThread().toString());
mRoomDatabase.setTransactionSuccessful();
})
.doFinally(() -> {
Log.w(LOG_TAG, "End transaction." + Thread.currentThread().toString());
mRoomDatabase.endTransaction();
})
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread()) // ON UI THREAD
.subscribeWith(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.w(LOG_TAG, "onSubscribe." + Thread.currentThread().toString());
}
@Override
public void onComplete() {
Log.w(LOG_TAG, "onComplete." + Thread.currentThread().toString());
}
@Override
public void onError(Throwable e) {
Log.e(LOG_TAG, "onError." + Thread.currentThread().toString());
}
});
日志语句现在看起来像这样:
W/MyPresenter: onSubscribe.Thread[main,5,main]
W/MyPresenter: Begin transaction. Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: End transaction.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: onComplete.Thread[main,5,main]
我相信这会完成我所追求的,但 step-by-step Room-based RxJava Completables 的汇编是否会成功还有待观察。我会留意任何 comments/answers 并可能会向后代报告。
我设法以这种方式进行允许来自不同表的操作的事务(例如使用 Dagger 注入数据库):
class RxRoomTransaction @Inject constructor(private val db : AppDatabase) {
fun run(fn : () -> Unit) : Completable {
return Completable.fromAction {
try {
db.beginTransaction()
fn.invoke()
db.setTransactionSuccessful()
} catch (t : Throwable) {
// Catch everything, including InterruptedException which is invoked on dispose
} finally {
try {
// Double check to catch possible exception caused by endTransaction (shouldn't occur)
db.endTransaction()
} catch (t : Throwable) {
}
}
}
}
}
这样称呼:
rxRoomTransaction.run {
dao1.insertAll(data1)
dao2.insert(data2)
dao3.clear()
}
DAO 方法不 返回 RxJava 对象:
@Dao
interface Dao3 {
@Query("DELETE FROM table3")
fun clear()
}
我的应用程序的一个要求是允许用户完成多个步骤,然后在完成后根据每个步骤中的条目将值写入数据库。 UI 中的每个步骤都可能有助于将需要写入数据库的操作。数据可能位于多个表中,并且属于这些表中的不同行。如果任何数据库操作失败,那么整个操作应该失败。
我最初考虑将所有数据加载到内存中,对其进行操作,然后简单地在每个可能的实体中调用更新方法(使用 REPLACE 的冲突策略),但可能会有非常大的数量内存中的数据。
我想我可以 assemble 一个列表,其中显示的每个 Fragment 贡献一个或多个 Completables,然后在结束时使用 Completable.concat() 顺序执行这些UI 流程。它看起来像下面这样:
Completable one = Completable.fromAction(() -> Log.w(LOG_TAG, "(1)")).delay(1, TimeUnit.SECONDS);
Completable two = Completable.fromAction(() -> Log.w(LOG_TAG, "(2)")).delay(2, TimeUnit.SECONDS);
Completable three = Completable.fromAction(() -> Log.w(LOG_TAG, "(3)")).delay(3, TimeUnit.SECONDS);
Completable four = Completable.fromAction(() -> Log.w(LOG_TAG, "(4)")).delay(3, TimeUnit.SECONDS);
Completable.concatArray(one, two, three, four)
.doOnSubscribe(__ -> {
mRoomDatabase.beginTransaction();
})
.doOnComplete(() -> {
mRoomDatabase.setTransactionSuccessful();
})
.doFinally(() -> {
mRoomDatabase.endTransaction();
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe();
Completables 实际上是 Room DAO insert/update/delete 方法的包装器。我可能还会在完成后执行 UI 操作,这就是我在主线程上观察的原因。
当我执行这段代码时,我得到了这些日志:
W/MyPresenter: Begin transaction.
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.
W/MyPresenter: End transaction.
W/System.err: java.lang.IllegalStateException: Cannot perform this operation because there is no current transaction.
W/System.err: at android.database.sqlite.SQLiteSession.throwIfNoTransaction(SQLiteSession.java:915)
W/System.err: at android.database.sqlite.SQLiteSession.endTransaction(SQLiteSession.java:398)
W/System.err: at android.database.sqlite.SQLiteDatabase.endTransaction(SQLiteDatabase.java:524)
W/System.err: at android.arch.persistence.db.framework.FrameworkSQLiteDatabase.endTransaction(FrameworkSQLiteDatabase.java:88)
W/System.err: at android.arch.persistence.room.RoomDatabase.endTransaction(RoomDatabase.java:220)
W/System.err: at ...lambda$doTest$MyPresenter(MyPresenter.java:490)
为什么在我到达 doFinally 时交易已经消失?我也欢迎任何关于这种方法的质量或可行性的评论,因为我对 RxJava 和 Room 还很陌生。
通过记录当前线程并仔细阅读 Android 开发人员 documentation 我想我可能终于明白我做错了什么了。
1) 事务必须发生在同一个线程上。这就是为什么它告诉我没有交易;我显然在线程之间跳动。
2) doOnSubscribe、doOnComplete 和 doFinally 方法是 side effects 因此不是实际流本身的一部分。这意味着它们不会出现在我 subscribe 上的调度程序上。它们将发生在我 observe 上的调度程序上。
3) 因为我想在完成后在UI线程上接收结果,但是希望副作用发生在后台线程上,所以我需要改变我观察的位置。
Completable.concatArray(one, two, three, four)
.observeOn(Schedulers.single()) // OFF UI THREAD
.doOnSubscribe(__ -> {
Log.w(LOG_TAG, "Begin transaction. " + Thread.currentThread().toString());
mRoomDatabase.beginTransaction();
})
.doOnComplete(() -> {
Log.w(LOG_TAG, "Set transaction successful." + Thread.currentThread().toString());
mRoomDatabase.setTransactionSuccessful();
})
.doFinally(() -> {
Log.w(LOG_TAG, "End transaction." + Thread.currentThread().toString());
mRoomDatabase.endTransaction();
})
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread()) // ON UI THREAD
.subscribeWith(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.w(LOG_TAG, "onSubscribe." + Thread.currentThread().toString());
}
@Override
public void onComplete() {
Log.w(LOG_TAG, "onComplete." + Thread.currentThread().toString());
}
@Override
public void onError(Throwable e) {
Log.e(LOG_TAG, "onError." + Thread.currentThread().toString());
}
});
日志语句现在看起来像这样:
W/MyPresenter: onSubscribe.Thread[main,5,main]
W/MyPresenter: Begin transaction. Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: End transaction.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: onComplete.Thread[main,5,main]
我相信这会完成我所追求的,但 step-by-step Room-based RxJava Completables 的汇编是否会成功还有待观察。我会留意任何 comments/answers 并可能会向后代报告。
我设法以这种方式进行允许来自不同表的操作的事务(例如使用 Dagger 注入数据库):
class RxRoomTransaction @Inject constructor(private val db : AppDatabase) {
fun run(fn : () -> Unit) : Completable {
return Completable.fromAction {
try {
db.beginTransaction()
fn.invoke()
db.setTransactionSuccessful()
} catch (t : Throwable) {
// Catch everything, including InterruptedException which is invoked on dispose
} finally {
try {
// Double check to catch possible exception caused by endTransaction (shouldn't occur)
db.endTransaction()
} catch (t : Throwable) {
}
}
}
}
}
这样称呼:
rxRoomTransaction.run {
dao1.insertAll(data1)
dao2.insert(data2)
dao3.clear()
}
DAO 方法不 返回 RxJava 对象:
@Dao
interface Dao3 {
@Query("DELETE FROM table3")
fun clear()
}