Schedulers.io 创建数百个 RxCachedThreadSchedulers

Schedulers.io creates hundreds of RxCachedThreadSchedulers

我在我的 android 应用程序中使用 RxJava,它 运行 多次进入 OutOfMemoryError。我用设备管理器检查了一下,我刚刚注意到,我有超过 200 个线程,其中大部分处于等待状态,通常是 RxCachedThreadSchedulers。 OOMError 由于线程过多而引发。 我还注意到,如果我按下一个按钮,它会调用一个服务并获取一个令牌并将其缓存,线程数会增加 5!

所以,我搜索了一下,发现 Schedulers.io 可以创建无限线程。当我用 Schedulers.computation 替换每个 Schedulers.io 时,问题就消失了,但这没有任何意义,因为我像应该使用的那样使用 Schedulers.io。

那么我如何使用 Schedulers.io 并确保它不会创建太多线程?

更新

我是这样取消订阅的:

    final Scheduler.Worker worker = Schedulers.io().createWorker();
    worker.schedule(new Action0() {
        @Override
        public void call() {
            long last = lastServerCommunication.getMillis();
            LongPreference pref = new LongPreference(mSharedPreferences, PREF_KEY_LAST_SERVER_COMMUNICATION);
            pref.set(last);
            worker.unsubscribe();
        }
    });

更新 #2

我使用 Schedulers.io 的常规方式是例如:

public Observable<Scenario> load() {
    return Observable
            .create(new Observable.OnSubscribe<Scenario>() {
                @Override
                public void call(Subscriber<? super Scenario> subscriber) {
                    try {
                        Scenario scenario = mGson.fromJson(mSharedPreferences.getString("SCENARIO", null), Scenario.class);
                        subscriber.onNext(scenario);
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        subscriber.onError(new Throwable());
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

并且:

    mSomeSubscription = mSomeManager.readFromDatabase()
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<List<SomeEntry>>() {
                @Override
                public void onCompleted() { }

                @Override
                public void onError(Throwable e) {
                    // some logging
                }

                @Override
                public void onNext(List<SomeEntry> Entries) {
                    // Some action
                }
            });

如果您使用 Schedulers.io().createWorker(),您必须在完成后 unsubscribe() Worker。常规的 RxJava 运算符不应该泄漏任何工作人员和线程。

好的,我找到原因了。查看说明,更新#2

return Observable.create(new Observable.OnSubscribe<Something>() {
            @Override
            public void call(Subscriber<? super Something> subscriber) {
                try {
                  // Some action
                    subscriber.onNext(scenario);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(new Throwable());
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

当你像这样创建冷的 Observable 序列时,你必须确保你在订阅者上调用了 onCompleted,见上文 subscriber.onCompleted();。 好吧,它在代码中的某些地方不存在,因此生成了 io 线程。

非常感谢 akarnokd 的帮助!

太棒了,如果您不调用 onComplete 方法,那么线程(由 Schedulers.io() 创建)将一直等待并存在于那些冷的可观察对象。