并行计数 - Java

Parallel counting - Java

我没有 CS 背景。我真的是并行编程的新手,我不知道 运行 一个程序时硬件是如何工作的。但是,我注意到以下几点。假设我有:

public class Counter {
    
    private static int parallelCount = 0;
    private static int sequentialCount = 0;

    public static void main(String[] args) {
        
        int n = 1000;
        
        // I count in parallel:
        IntStream.range(0, n).parallel().forEach(i -> {
            parallelCount++;
        });
        
        // I count sequentially:
        for (int i = 0; i < n; i++) {
            sequentialCount++;
        }
        
        System.out.println("parallelCount   = " + parallelCount);
        System.out.println("sequentialCount = " + sequentialCount);

    }

}

为什么我可能会得到:

parallelCount   = 984
sequentialCount = 1000

我想这与硬件和编译器访问内存的方式有关。我真的很想知道为什么会这样。一种可能的解决方案是什么?

只要有多个线程可以访问一个可变值,系统就会失去同步,这意味着您正面临着这种问题。没有人能确定结果会怎样,而且很多时候结果都是错误的。您无法保证哪个线程将最后写入值。

因此,您需要synchronize访问共享资源(您递增的整数)以便所有线程获得最新的更新值并且答案始终正确。

来到您的程序中,您可以尝试使 parallelCount 变量成为原子整数,例如 AtomicInteger parallelCount = new AtomicInteger(); 原子整数是线程安全的,这意味着它们可以在没有 运行 系统的情况下同时更新不同步。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class Counter {

    private static AtomicInteger parallelCount = new AtomicInteger();
    private static int sequentialCount = 0;

    public static void main(String[] args) {

        int n = 1000;

        // I count in parallel:
        IntStream.range(0, n).parallel().forEach(i -> {
            parallelCount.getAndIncrement();
        });

        // I count sequentially:
        for (int i = 0; i < n; i++) {
            sequentialCount++;
        }

        System.out.println("parallelCount   = " + parallelCount);
        System.out.println("sequentialCount = " + sequentialCount);

    }

}

  • 如您所料,标准 for 循环会将 sequentialCount 递增 1000 次
  • 关于并行流,应用程序将尝试打开多个需要并行执行您的函数的线程。在这种情况下,多个线程可以同时增加值并将值存储到int。

例如,假设我们有两个并行工作的线程,并且想要增加变量 parallelCount 的值。如果 parallelCount 包含值 50。两个线程将读取值 50 并计算新值 51 并将其存储到内存中。

这种方法会产生其他并发问题。为了解决这个问题,可以使用同步、锁定、原子类,或者其他的方法。

AtomicInteger

我们可以在使用并行流时使用 AtomicInteger class 来自 Java 并发包,因为在使用原始数据类型时行为可能无法预测

import java.util.stream.IntStream;
import java.util.concurrent.atomic.AtomicInteger;
public class Main
{
    private static AtomicInteger parallelCount = new AtomicInteger();
    private static int sequentialCount = 0;

    public static void main(String[] args) {
        System.out.println("Hello World");
        int n = 100000;
        
        // I count in parallel:
        IntStream.range(0, n).parallel().forEach(i -> {
            parallelCount.incrementAndGet();
        });
        
        // I count sequentially:
        for (int i = 0; i < n; i++) {
            sequentialCount++;
        }
        
        System.out.println("parallelCount   = " + parallelCount);
        System.out.println("sequentialCount = " + sequentialCount);
    }
}

多个 thead 执行非原子操作(增加值)。 您编写的代码转换为字节码,可能会导致如下情况:

为避免这种情况,您需要同步对该关键代码的访问。 但请注意,如果您的所有代码都是关键代码,那么使用多线程是多余的。