将 RxJava 1.0 代码更新到 RxJava 2.0

Updating RxJava 1.0 code to RxJava 2.0

我正在更新一些旧的 android 应用程序代码,但无法弄清楚如何将在 RxJava/RxAndroid 1.0 中工作的代码转换为 RxJava/RxAndroid 2.0。我试图转换的代码负责获取信号强度指示器并每秒用它的值更新图表。我设法在 RxJava 2.0 中破解的东西根本不起作用,几秒钟后应用程序因 SIGABORT 而崩溃。我不知道我错过了什么?

我的 RxJava 1 代码:

rx.Observer myObserver = new rx.Observer<Float>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Float aFloat)
            {
                addEntry(aFloat);
            }
        };

        subscription = rx.Observable.interval(1, TimeUnit.SECONDS)
                            .flatMap(new Func1<Long, rx.Observable<Float>>() {
                                @Override
                                public rx.Observable<Float> call(Long aLong) {
                                    return getSignalStrength();
                                }
                            })
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(myObserver);


private rx.Observable<Float> getSignalStrength()
{
    return rx.Observable.create(subscriber ->
    {
        WifiManager wifiManager = (WifiManager) getContext().getSystemService(Context.WIFI_SERVICE);
        Integer value = wifiManager.getConnectionInfo().getRssi();
        subscriber.onNext(value.floatValue());
        subscriber.onCompleted();
    });
}

以及我在尝试做 RxJava 2.0 时的想法:

        DisposableObserver<Float> disposableObserver = new DisposableObserver<Float>()
        {
            @Override
            public void onNext(Float value)
            {
                addEntry(value);
            }

            @Override
            public void onError(Throwable e)
            {
            }

            @Override
            public void onComplete()
            {
            }
        };

        Disposable disposable = Observable.interval(1, TimeUnit.SECONDS)
                .flatMap(new Function<Long, ObservableSource<Float>>()
                {
                    @Override
                    public ObservableSource<Float> apply(Long aLong) throws Exception
                    {
                        return signalStrengthInfo.getSignalStrength();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(disposableObserver);


private Observable<Float> getSignalStrength()
{
    return Observable.create(subscriber ->
    {
        wifiManager = (WifiManager) context.getSystemService(Context.WIFI_SERVICE);
        Integer signalStrength = wifiManager.getConnectionInfo().getRssi();
        subscriber.onNext(Float.valueOf(signalStrength));
        subscriber.onComplete();
    });
}

你在 Rx2 中的代码确实有问题。

只需查看简单的解决方案(对 kotlin 感到抱歉),但如果您在使用 java 时遇到问题,请在评论中联系我。

val disposable = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap { getSignalStrength() }
                .doOnNext { addEntry(it) }
                .subscribe()


private fun getSignalStrength(): Observable <Float> {
            val wifiManager = this.getSystemService(Context.WIFI_SERVICE) as WifiManager
            val signalStrength = wifiManager.connectionInfo.rssi
            return Observable.just(signalStrength.toFloat())
    }