为什么我的线程数随着 rxjava 重试错误而上升?

Why is my thread count going up with rxjava retry error?

在函数方法中,我抛出一个 nullPointerException 并打印出线程计数 (sout(Thread.activeCount)。似乎每次我重试错误时,threadCount 都会增加 1。为什么会这样? 我应该假设它只会收集垃圾吗?

public void start(int time) {
    Observable.interval(time, TimeUnit.SECONDS)
            .doOnNext(t -> function())
            .doOnError(System.out::println)
            .retry()
            .subscribe();

}

可能 Observable.interval() 在您每次重试时从其计算调度程序线程池中获取新线程。也许这样做是因为之前的线程仍在使用中?恕我直言,它不应该无限期地上升,因为它表明存在一些错误。

默认情况下,interval 重载使用的 computation Scheduler 限制为 CPU 数量 (Runtime.getRuntime().availableProcessors())。当您因重试而不断重新订阅时,Scheduler 将启动越来越多的工作线程。但是,线程总数不应无休止地增长。

如果你不想有那么多工作线程,考虑使用 Schedulers.single()Schedulers.io() 后者重用工作线程:

Observable.interval(time, TimeUnit.SECONDS, Schedulers.single())
        .doOnNext(t -> function())
        .doOnError(System.out::println)
        .retry()
        .subscribe();

Observable.interval(time, TimeUnit.SECONDS, Schedulers.io())
        .doOnNext(t -> function())
        .doOnError(System.out::println)
        .retry()
        .subscribe();