从不同的调度器设置和读取 rxJava 链中的实例变量值

Setting and reading instance variable value within rxJava chain from different Schedulers

我不确定 reading/writing 来自具有不同调度程序的 rxJava 链实例变量的安全性。有个小例子


public class RxJavaThreadSafety {

    private int variable = 0;

    // First call 
    public void doWriting() {
        Single.just(255)
                .doOnSuccess(
                        newValue -> variable = newValue
                )
                .subscribeOn(Schedulers.io())
                .subscribe();
    }

    // Second call
    public void doReadingRxChain() {
        Single.fromCallable((Callable<Integer>) () -> variable)
                .subscribeOn(Schedulers.computation())
                .subscribe(
                        result -> System.out.println(result)
                );
    }

    // Third call
    public void doReading() {
        System.out.println(variable);
    }

}

为简单起见,假设这三个方法一个接一个地被调用

我的问题:在 io 调度程序“中”设置变量并最近从计算调度程序或主线程“读取”该变量是否线程安全?

我认为这不是线程安全的,但我希望一些 rxJava 和并发专家来证明这一点

不,这不是线程安全的。

当您使用 subscribeOn 时,这意味着调用 subscribe() 会将生成项目的任务添加到调度程序的工作队列中。

doWriting()doReadingRxChain() 方法将任务添加到不同的调度程序。无法保证 doWriting() 中的链甚至会在 doReadingRxChain() 之前开始到 运行。例如,如果所有 IO 线程都忙,就会发生这种情况。

还有一个更根本的问题:您在一个线程中写入 variable 的值并在另一个线程中读取它。在没有任何并发​​控制的情况下,无法保证 variable 的新值被读取它的线程看到。一种解决方法是将变量声明为 volatile:

private volatile int variable = 0;