Rxjava2 just method - 如何 运行 在另一个线程上插入房间?
Rxjava2 just method - how to run room insertion on another thread?
我有一个房间持久数据库插入方法,如下所示:
@Dao
public interface CountriesDao{
@Insert(onConflict = REPLACE)
List<Long> addCountries(List<CountryModel> countryModel);
}
我意识到这不能在主线程上 运行。这是我定义数据库的方式:
Room.inMemoryDatabaseBuilder(context.getApplicationContext(), MyDatabase.class).build();
我正在尝试使用 rxjava2,这样我就不会 运行 在主线程上。我创建了以下方法:
public void storeCountries(List<CountryModel> countriesList) {
Observable.just(db.countriesDao().addCountries(countriesList))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DefaultSubscriber<List<Long>>(){
@Override
public void onSubscribe(@NonNull Disposable d) {
super.onSubscribe(d);
}
@Override
public void onNext(@NonNull List<Long> longs) {
super.onNext(longs);
Timber.d("insert countries transaction complete");
}
@Override
public void onError(@NonNull Throwable e) {
super.onError(e);
Timber.d("error storing countries in db"+e);
}
@Override
public void onComplete() {
Timber.d("insert countries transaction complete");
}
});
}
对我来说,现在显然 运行正在另一个线程上。不是主线程,但是当我 运行 这段代码时,我得到以下错误:
完整的堆栈跟踪如下。为什么会这样?
Process: com.mobile.myapp.staging, PID: 12990
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.
Caused by: java.lang.IllegalStateException: Cannot access database on
the main thread since it may potentially lock the UI for a long period
of time.
at
io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111)
at android.os.Handler.handleCallback(Handler.java:751)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6077)
at java.lang.reflect.Method.invoke(Native Method)
at
com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:866)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:756)
Caused by: java.lang.IllegalStateException: Cannot access database on
the main thread since it may potentially lock the UI for a long period
of time.
at
android.arch.persistence.room.RoomDatabase.assertNotMainThread(RoomDatabase.java:138)
at
android.arch.persistence.room.RoomDatabase.beginTransaction(RoomDatabase.java:185)
at
com.mobile.myapp.data.room.dao.CountriesDao_Impl.addCountries(CountriesDao_Impl.java:165)
at
com.mobile.myapp.data.repositories.CountryRepository.storeCountries(CountryRepository.java:42)
at
com.mobile.myapp.UI.mvp.Presenters.SignUpPresenter.cacheCountries(SignUpPresenter.java:40)
at
com.mobile.myapp.UI.mvp.Presenters.SignUpPresenter$CountriesSubscriber.onNext(SignUpPresenter.java:60)
at
com.mobile.myapp.UI.mvp.Presenters.SignUpPresenter$CountriesSubscriber.onNext(SignUpPresenter.java:49)
at
io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:200)
at
io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:252)
at
io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109)
at android.os.Handler.handleCallback(Handler.java:751)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:154)
at android.app.ActivityThread.main(ActivityThread.java:6077)
at java.lang.reflect.Method.invoke(Native Method)
at
com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:866)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:756)
不重要,但如果你需要知道 defaultSubscriber class 是什么样子,它是:
DefaultSubscriber.java
public class DefaultSubscriber<T> implements Observer<T> {
Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull T t) {
}
@Override
public void onError(@NonNull Throwable e) {
Timber.e(e);
}
@Override
public void onComplete() {
}
public void unsubscribe(){
if(disposable!=null && !disposable.isDisposed()){
disposable.dispose();
}
}
}
这是一个常见的错误:just()
不会在括号内执行 "code",因为 just
取值,而不是计算。你需要 fromCallable
:
Observable.fromCallable(() -> db.countriesDao().addCountries(countriesList))
你也可以使用 Single observable
Single.create(new SingleOnSubscribe<List<Long>>() {
@Override
public void subscribe(SingleEmitter<List<Long>> emitter) throws Exception {
try {
List<Long> ids = db.countriesDao().addCountries(countriesList);
emitter.onSuccess(ids);
} catch (Throwable t) {
emitter.onError(t);
}
}})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribeWith(new DisposableSingleObserver<Long>() {
@Override
public void onSuccess(List<Long> ids) {
}
@Override
public void onError(Throwable e) {
}
});
更好的是,您可以使用 Completable
。其描述:表示一个没有任何值的计算,但仅指示完成或异常。
Completable.fromAction(() -> db.countriesDao().addCountries(list)).subscribe();
Note: Room doesn't support database access on the main thread unless
you've called allowMainThreadQueries() on the builder because it might
lock the UI for a long period of time. Asynchronous queries - queries
that return instances of LiveData or Flowable are exempt from this
rule because they asynchronously run the query on a background thread
when needed.
所以你的代码可以是这样的
Completable.fromAction(() -> db.countriesDao()
.addCountries(list))
.subscribeOn(Schedulers.io())
.subscribe();
从room room 2.1.0-alpha02
,插入时可以使用(Completeable
,Single
,Maybe
)(
https://medium.com/androiddevelopers/room-rxjava-acb0cd4f3757)
例子
@Dao
interface UserDao{
@Insert
Completable insert(final User user); // currently, we must put final before user variable or you will get error when compile
}
正在使用
db.userDao().insert(user).subscribeOn(Schedulers.io()).subscribe(new Action() {
@Override
public void run() throws Exception {
// success
}
}, new Consumer < Throwable > () {
@Override
public void accept(Throwable throwable) throws Exception {
// error
}
});
确保你已经添加了两个依赖项,
“实施 'io.reactivex.rxjava3:rxjava:3.0.0'
实施 'io.reactivex.rxjava3:rxandroid:3.0.0'"
我有一个房间持久数据库插入方法,如下所示:
@Dao
public interface CountriesDao{
@Insert(onConflict = REPLACE)
List<Long> addCountries(List<CountryModel> countryModel);
}
我意识到这不能在主线程上 运行。这是我定义数据库的方式:
Room.inMemoryDatabaseBuilder(context.getApplicationContext(), MyDatabase.class).build();
我正在尝试使用 rxjava2,这样我就不会 运行 在主线程上。我创建了以下方法:
public void storeCountries(List<CountryModel> countriesList) {
Observable.just(db.countriesDao().addCountries(countriesList))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DefaultSubscriber<List<Long>>(){
@Override
public void onSubscribe(@NonNull Disposable d) {
super.onSubscribe(d);
}
@Override
public void onNext(@NonNull List<Long> longs) {
super.onNext(longs);
Timber.d("insert countries transaction complete");
}
@Override
public void onError(@NonNull Throwable e) {
super.onError(e);
Timber.d("error storing countries in db"+e);
}
@Override
public void onComplete() {
Timber.d("insert countries transaction complete");
}
});
}
对我来说,现在显然 运行正在另一个线程上。不是主线程,但是当我 运行 这段代码时,我得到以下错误:
完整的堆栈跟踪如下。为什么会这样?
Process: com.mobile.myapp.staging, PID: 12990
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler. Caused by: java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time. at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6077) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:866) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:756) Caused by: java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time. at android.arch.persistence.room.RoomDatabase.assertNotMainThread(RoomDatabase.java:138) at android.arch.persistence.room.RoomDatabase.beginTransaction(RoomDatabase.java:185) at com.mobile.myapp.data.room.dao.CountriesDao_Impl.addCountries(CountriesDao_Impl.java:165) at com.mobile.myapp.data.repositories.CountryRepository.storeCountries(CountryRepository.java:42) at com.mobile.myapp.UI.mvp.Presenters.SignUpPresenter.cacheCountries(SignUpPresenter.java:40) at com.mobile.myapp.UI.mvp.Presenters.SignUpPresenter$CountriesSubscriber.onNext(SignUpPresenter.java:60) at com.mobile.myapp.UI.mvp.Presenters.SignUpPresenter$CountriesSubscriber.onNext(SignUpPresenter.java:49) at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:200) at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:252) at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109) at android.os.Handler.handleCallback(Handler.java:751) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:154) at android.app.ActivityThread.main(ActivityThread.java:6077) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:866) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:756)
不重要,但如果你需要知道 defaultSubscriber class 是什么样子,它是:
DefaultSubscriber.java
public class DefaultSubscriber<T> implements Observer<T> {
Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull T t) {
}
@Override
public void onError(@NonNull Throwable e) {
Timber.e(e);
}
@Override
public void onComplete() {
}
public void unsubscribe(){
if(disposable!=null && !disposable.isDisposed()){
disposable.dispose();
}
}
}
这是一个常见的错误:just()
不会在括号内执行 "code",因为 just
取值,而不是计算。你需要 fromCallable
:
Observable.fromCallable(() -> db.countriesDao().addCountries(countriesList))
你也可以使用 Single observable
Single.create(new SingleOnSubscribe<List<Long>>() {
@Override
public void subscribe(SingleEmitter<List<Long>> emitter) throws Exception {
try {
List<Long> ids = db.countriesDao().addCountries(countriesList);
emitter.onSuccess(ids);
} catch (Throwable t) {
emitter.onError(t);
}
}})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribeWith(new DisposableSingleObserver<Long>() {
@Override
public void onSuccess(List<Long> ids) {
}
@Override
public void onError(Throwable e) {
}
});
更好的是,您可以使用 Completable
。其描述:表示一个没有任何值的计算,但仅指示完成或异常。
Completable.fromAction(() -> db.countriesDao().addCountries(list)).subscribe();
Note: Room doesn't support database access on the main thread unless you've called allowMainThreadQueries() on the builder because it might lock the UI for a long period of time. Asynchronous queries - queries that return instances of LiveData or Flowable are exempt from this rule because they asynchronously run the query on a background thread when needed.
所以你的代码可以是这样的
Completable.fromAction(() -> db.countriesDao()
.addCountries(list))
.subscribeOn(Schedulers.io())
.subscribe();
从room room 2.1.0-alpha02
,插入时可以使用(Completeable
,Single
,Maybe
)(
https://medium.com/androiddevelopers/room-rxjava-acb0cd4f3757)
例子
@Dao
interface UserDao{
@Insert
Completable insert(final User user); // currently, we must put final before user variable or you will get error when compile
}
正在使用
db.userDao().insert(user).subscribeOn(Schedulers.io()).subscribe(new Action() {
@Override
public void run() throws Exception {
// success
}
}, new Consumer < Throwable > () {
@Override
public void accept(Throwable throwable) throws Exception {
// error
}
});
确保你已经添加了两个依赖项, “实施 'io.reactivex.rxjava3:rxjava:3.0.0' 实施 'io.reactivex.rxjava3:rxandroid:3.0.0'"