如何用 rxjava 替换使用 volatile 变量的线程间通信?

How can I replace inter thread communication using volatile variables with rxjava?

我有一个应用程序,它通过让一个线程在另一个线程检查的某个对象上设置一个 volatile 变量来在线程之间进行大量通信。我发现这很容易出错,我想尝试使用 RxJava 替换它,但在某些情况下我不知道如何转换。

我现在遇到的问题是我有两个线程,我们称一个为控制器,另一个为测量器。测量器的工作是每 100 毫秒记录一些量。控制器会与应用程序的各个部分进行大量通信,并且它会时不时地告诉测量器更改它正在测量的内容。现在它通过设置一个 volatile 变量来做到这一点,并且测量器循环的每次迭代都会检查该变量以查看要测量的内容。

测量器不能与控制器在同一个线程中,因为测量需要时间,而控制器不能延迟它正在做的其他工作。

感觉解决方案就像使控制器成为可观察对象,只要测量器的指令需要更新,它就会发出一个项目,但我能看到测量器在接收到事件时改变其行为的唯一方法就是让这些事件的订阅者像以前一样设置 volatile 变量,然后我就没有任何进展。

我想知道我是否可以以某种方式获取控制器发出的项目流并将其转换为一遍又一遍地重复每个项目直到控制器发出不同项目的流,然后我可以订阅这些重复的项目在测量器中,它会在每次收到测量时进行测量。这是正确的方法吗?如果是,我如何将控制器发出的项目转换为重复的项目流?

我对 Rx 比较陌生,但我会使用 BehaviorSubject。您可以使用 distinctUntilChanged(),或将其与计时器 Observable 结合使用:

    public enum Stat { FOO, BAR }

    public class Controller
    {
        private Subject<Stat> statSubject;

        public Controller()
        {
            statSubject = BehaviorSubject.<Stat>create().toSerialized();
        }

        public Observable<Stat> getStatChange()
        {
            return statSubject.distinctUntilChanged();
        }

        public void setStat( Stat stat )
        {
            statSubject.onNext( stat );
        }
    }

    public class Measurer
    {
        public Measurer( Controller controller )
        {
            Observable.timer( 1, TimeUnit.SECONDS, Schedulers.newThread() )
                .repeat()
                .withLatestFrom(
                        controller.getStatChange(),
                        ( __, stat ) -> stat ) // ignore the Long emitted by timer
                .subscribe( this::measureStat );
        }

        private void measureStat( Stat stat )
        {
            switch( stat )
            {
            case FOO:
                measureFoo();
                break;

            default:
                measureBar();
                break;
            }
        }

        private void measureBar()
        {
            System.out.println( "Measuring Bar" );
        }

        private void measureFoo()
        {
            System.out.println( "Measuring Foo" );
        }
    }

    @Test
    public void testMeasureStats() throws InterruptedException
    {
        Controller controller = new Controller();
        controller.setStat( Stat.BAR );

        @SuppressWarnings( "unused" )
        Measurer measurer = new Measurer( controller );

        Thread.sleep( 5000 );

        controller.setStat( Stat.FOO );

        Thread.sleep( 5000 );

        controller.setStat( Stat.BAR );

        Thread.sleep( 5000 );
    }

输出:

Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar