从 CyclicBarrier 中的屏障操作访问输出

Accessing output from barrier action in CyclicBarrier

我之前在 oracle 博客上练习这个问题,我有一个问题是如何 obtain/access CyclicBarrier 中的屏障操作的输出。

博客link

https://blogs.oracle.com/javamagazine/post/quiz-yourself-happens-before-thread-synchronization-in-java-with-cyclicbarrier

代码

 public class CBTest {

    private List<Integer> results = Collections.synchronizedList(new ArrayList<>());
    
    class Calculator extends Thread {
        CyclicBarrier cb;
        int param;

        public Calculator(CyclicBarrier cb, int param) {
            this.cb = cb;
            this.param = param;
        }


        public void run() {
            try {
                results.add(param);
                System.out.println("going to await");
                cb.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    void doCalculation() {
        CyclicBarrier cb = new CyclicBarrier(2, () -> {
            var ans = results.stream().mapToInt(v -> v.intValue()).sum();
            System.out.println("ANS IS "+ans);
            });
        new Calculator(cb, 2).start();
        new Calculator(cb, 3).start();
    }


    public static void main(String[] args) {
        var test = new CBTest();
        test.doCalculation();
    }
  }

请告诉我如何在main方法中获取循环屏障动作的值ans

因为 CyclicBarrier 只接受一个 Runnable,它没有 return 一个值。因此,您需要在外部定义的输出容器中设置结果 - 可能是 AtomicReference 作为 CBTest 中的字段。 Runnable 然后可以为其设置值。

这是一个略有改动的版本,我的改动标有评论 CHANGE

public class CBTest{
    private List<Integer> results = Collections.synchronizedList( new ArrayList<>() );

    /* CHANGE: Value holder. Could be another thread-safe class. */
    private AtomicReference<Integer> answer = new AtomicReference<>( 0 );

    class Calculator extends Thread{
        CyclicBarrier cb;
        int param;

        public Calculator( CyclicBarrier cb, int param ){
            this.cb = cb;
            this.param = param;
        }

        public void run(){
            try{
                results.add( param );
                System.out.println( "going to await" );
                cb.await();
                
                /* CHANGE: Both threads get the same value. */
                System.out.println( answer.get() );
            }
            catch( InterruptedException | BrokenBarrierException e ){
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    void doCalculation(){
        CyclicBarrier cb = new CyclicBarrier( 2, () -> {
            var ans = results.stream().mapToInt( v -> v.intValue() ).sum();
            System.out.println( "ANS IS " + ans );
            
            /* CHANGE: Set the value here. */
            answer.set( ans );
        } );
        new Calculator( cb, 2 ).start();
        new Calculator( cb, 3 ).start();
    }

    public static void main( String[] args ){
        var test = new CBTest();
        test.doCalculation();
    }
}

您可以通过使用额外的 CountDownLatch(CountDownLatch) 同步生成 doCalculation return 结果,在这种情况下,您的 doCalculation 方法将是阻塞直到产生计算结果:

    CountDownLatch countDownLatch = new CountDownLatch(1);

    int doCalculation() throws InterruptedException {
        AtomicInteger result = new AtomicInteger();
        CyclicBarrier cb = new CyclicBarrier(2, () -> {
            result.set(results.stream().mapToInt(v -> v.intValue()).sum());
            // count down
            countDownLatch.countDown();
        });
        new Calculator(cb, 2).start();
        new Calculator(cb, 3).start();
        // block util it is countDown.
        countDownLatch.await();
        return result.get();
    }

    public static void main(String[] args) throws InterruptedException {
        var test = new CBTest();
        System.out.println("ANS IS " + test.doCalculation());
    }