将 AsyncTask 更改为 Rxjava

Changing AsyncTask to Rxjava

我在将 AsyncTask doInBackground 进程转换为 RxJava 时遇到了挑战。我很想知道如何将其转换为 Rx Java,因为我尝试过的 none 正在运行。

new AsyncTask<Void, Void, Integer>() {
            @Override
            protected Integer doInBackground(Void... voids) {

                return mDAO.getCount();
            }

            @Override
            protected void onPostExecute(Integer count) {
                if (count == 0)
                    mCount.setText("All Notifications");
                else
                    mCount.setText("New Notificaiton "+count);
            }
        }.execute();

我为 Rx 尝试了这个

Observable<Integer> count = Observable.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return mDAO.getCount();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
        count.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                mDisposable.add(d);
            }

            @Override
            public void onNext(Integer integer) {
                if (integer == 0)
                    mCount.setText("All Notifications");
                else
                    mCount.setText("New Notification "+count);

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

我得到这个而不是计数

Count io.reactivex.internal.operators.observable.ObservableObserveOn@5ccee5b

我该如何解决这个问题?谢谢。

您的实施是出现此错误的原因。您应该改用单个可调用对象。这应该 100% 有效,如果您对此有任何挑战,请告诉我。

Single.fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return mDAO.getCount();
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnSuccess(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        if (integer == 0)
                            mCount.setText("All Notifications");
                        else
                            mCount.setText("New Notification "+integer);
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                })
                .subscribe();