如何用 RxJava Observer 替换 Asynctask?

How do I replace Asynctask with RxJava Observer?

我有一个带有 Room 数据库的测试项目。使用 Asynctask 我可以成功地将一个带有一些测试数据的对象插入到数据库中。我正在尝试学习 RxJava 并将 Asynctask 替换为 RxJava's observer,但它不起作用。我已经阅读了很多文档并观看了教程,但我认为我不太明白。相关代码如下:

这里我用 List:

中的数据设置我的 Room 对象
for(ObjectForArray item: listToDatabase) {
        myRoomEntity.setName( item.getName() );
        Log.d( "TAG", myRoomEntity.getName() );
    }

然后我尝试使用RxJava Observable 向数据库中插入数据。这最初是使用 Asynctask:

成功完成的
Observable<MyRoomEntity> myRX = Observable
            .just(myRoomEntity)
            .subscribeOn( Schedulers.io() )
            .observeOn( AndroidSchedulers.mainThread() );

myRX.subscribe( new Observer<MyRoomEntity>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d("TAG ONSUBSCRIBE", d.toString());

            try {
                myViewModel.insertDatabase( myRoomEntity );
                Log.d( "TAG", "Populating database Success" );
            }catch(Error error) {
                Log.d( "TAG", error.toString() );
            }
        }

OnNextOnErrorOnComplete 为空。

当我 运行 项目崩溃并出现错误时:

Cannot access database on the main thread since it may potentially lock the UI for a long period of time.

我显然使用 RxJava 是错误的,因为重点是在远离主线程的地方执行异步任务。

您收到此错误是因为您试图在主 (UI) 线程上插入对象。

你应该这样做:

Observable.fromCallable(() -> myViewModel.insertDatabase( myRoomEntity ))
            .subscribeOn( Schedulers.io() )
            .observeOn( AndroidSchedulers.mainThread() );

然后使用Observer订阅Observable。

请尝试像这样重构您的代码:

Completable.fromAction(() -> myViewModel.insertDatabase(myRoomEntity))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(() -> Log.d("TAG", "Populating database Success"),
                        throwable -> Log.d("TAG", throwable.toString()))

注意事项:

  1. 如果在订阅整个构造之前您的 myRoomEntity 不可用,请确保您使用 defer http://reactivex.io/documentation/operators/defer.html
  2. 您的订阅部分处理程序正在 "main" 上运行,这就是您收到崩溃的原因。
  3. 如果可能,避免不必要的 just 调用

我已经使用 RX java 代替 Asyntask,因为它在 android 9 中已被弃用 android 提供了多个替代品,例如 Executors、threads、Listenable Futures、Coroutines,因此您正在寻找如何使用 rxjava 实现它以及 RX Java java 如何提供帮助您要迁移只需先在 gradle

中添加这些依赖项
implementation "io.reactivex.rxjava2:rxjava:2.2.20"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"

一旦你导入让我们开始使用 RX java 我会告诉你你可以在哪里放置后台任务,预执行,post 像异步任务一样执行

让我们先用 Rx java 开始编码,我在方法中有注释,可以帮助您放置代码

 Observable.fromCallable(new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {

            /// here is your background task


            return true;
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Boolean>() {
                @Override
                public void onSubscribe(Disposable d) {

                    //// pre execute here is my progress dialog

                    showProgressDialog(getString(R.string.scanning));

                }

                @Override
                public void onNext(Boolean aBoolean) {

                    //// here is on sucess you can do anystuff here like
                    if (aBoolean){

                        /// if its value true you can go ahead with this

                    }

                }

                @Override
                public void onError(Throwable e) {

                    /// this helps you to go if there is any error show dialog whatever you wants here

                    Log.e("error of kind",e.getMessage() );

                }

                @Override
                public void onComplete() {

                    /// when your task done means post execute


                }
            });

完成后让我们开始实施

  Observable.fromCallable(new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {

            /// here is your background task
            uribitmap =  getScannedBitmap(original, points);
            uri = Utils.getUri(getActivity(), uribitmap);
            scanner.onScanFinish(uri);

            return true;
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Boolean>() {
                @Override
                public void onSubscribe(Disposable d) {

                    //// pre execute here is my progress dialog
                    showProgressDialog(getString(R.string.scanning));

                }

                @Override
                public void onNext(Boolean aBoolean) {

                    //// here is on sucess you can do anystuff here like
                    if (aBoolean){

                        /// if its value true you can go ahead with this

                    }

                }

                @Override
                public void onError(Throwable e) {

                    /// this helps you to go if there is any error show dialog whatever you wants here

                    Log.e("error of kind",e.getMessage() );

                }

                @Override
                public void onComplete() {

                    /// when your task done means post execute

                    uribitmap.recycle();
                    dismissDialog();
                }
            });
  

现在我将与执行者一起做这件事:

 /// pre execute you can trigger to progress dialog
            showProgressDialog(getString(R.string.scanning));

            ExecutorService executors = Executors.newSingleThreadExecutor();
            executors.execute(new Runnable() {
                @Override
                public void run() {
                    //// do background heavy task here
                    final Bitmap uribitmap =  getScannedBitmap(original, points);
                    uri = Utils.getUri(getActivity(), uribitmap);
                    scanner.onScanFinish(uri);
                    new Handler(Looper.getMainLooper()).post(new Runnable() {
                        @Override
                        public void run() {
                            //// Ui thread work like
                            uribitmap.recycle();
                            dismissDialog();
                        }
                    });
                }
            });