Java AtomicInteger 在一百万次迭代后不等于一百万(包括最小示例)

Java AtomicInteger not equal to a million after a million iterations (minimal example included)

我正在编写一个程序来计算数组中 0 到 100 万(不包括)之间的整数的平方。我最多使用 8 个线程(含)。
为了索引数组,我使用了一个原子整数。
我希望 运行 方法中的 for 循环体被执行一百万次,而不管线程数如何。
为了计算它被执行了多少次,我使用了另一个原子整数。

   static AtomicInteger cnt = new AtomicInteger(0);
    static AtomicInteger ii = new AtomicInteger(0);
    static long[] arr = new long[1_000_000];

    public static class Worker extends Thread {

        public static void main(String[] args) throws IOException, InterruptedException {
            int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
            for (int n = 1; n <= maxThreads; n++) {
                int numThreads = n;
                Thread[] threads = new Thread[numThreads];
                ii = new AtomicInteger(0);
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Worker();
                    threads[i].start();
                }
                for (Thread t : threads) {
                    t.join();
                }
                System.out.printf("%d threads, cnt is %d\n" , threads.length, cnt.get());
                cnt.set(0);
            }
        }

        @Override
        public void run() {
            for (int i = ii.get(); i < arr.length; i = ii.getAndIncrement()) {
                arr[i] = (long)i*i;
                cnt.getAndIncrement();
            }
        }
    }

预期执行结果为:

1 threads, cnt is 1000000
2 threads, cnt is 1000000
3 threads, cnt is 1000000
4 threads, cnt is 1000000
5 threads, cnt is 1000000
6 threads, cnt is 1000000
7 threads, cnt is 1000000
8 threads, cnt is 1000000

但是在 运行ning 之后我得到了以下信息:

1 threads, cnt is 1000001
2 threads, cnt is 1000002
3 threads, cnt is 1000003
4 threads, cnt is 1000002
5 threads, cnt is 1000003
6 threads, cnt is 1000002
7 threads, cnt is 1000002
8 threads, cnt is 1000005

你能帮我调试一下吗?

你为什么要这么做:

ii = new AtomicInteger(0);

在你的主循环中?整个程序有 1 个名为 ii 的变量,但它被分配了一个值轮播,同时在两者之间启动线程,这意味着每个线程都有您创建的任意一个(核心* 2)线程,因此,您的应用程序的确切工作方式取决于月相。

只需删除该行。

另请注意,您循环 n 'available processors *2' 次,然后 循环中循环一次 for n again。那肯定不是你的意图。如果是,则您的变量名在撒谎。

有一些小问题,例如运行之间的 ii = new AtomicInteger(0) 赋值或抛出 IOException 的声明永远不会发生。虽然对 ii 的赋值在这里没有影响,因为它发生在没有线程访问 ii 的地方,但它可能会分散注意力,因为它与多线程代码的既定代码模式不同。您应该像在两次运行之间重置 cnt 一样重置 ii

实际问题是循环的起点:

for (int i = ii.get(); i < arr.length; i = ii.getAndIncrement()) {

您正在使用 get 读取而不增加值,因此多个线程可能会读取相同的值,从而导致随后的 arr[i] = (long)i*i; 发生数据竞争(此处未引起注意,但当然,应该避免)并执行比必要更多的迭代,正如您注意到的那样 cnt 更新。您应该像对后续迭代一样对初始索引使用 getAndIncrement(),以确保每个线程访问不同的数组索引。

static final AtomicInteger cnt = new AtomicInteger(0);
static final AtomicInteger ii = new AtomicInteger(0);
static final long[] arr = new long[1_000_000];

public static class Worker extends Thread {

    public static void main(String[] args) throws InterruptedException {
        int maxThreads = Runtime.getRuntime().availableProcessors() * 2;

        for (int numThreads = 1; numThreads <= maxThreads; numThreads++) {
            Thread[] threads = new Thread[numThreads];
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Worker();
                threads[i].start();
            }
            for (Thread t : threads) {
                t.join();
            }
            System.out.printf("Used %d threads, cnt is %d\n" , numThreads, cnt.get());
            cnt.set(0);
            ii.set(0);
        }
    }

    @Override
    public void run() {
        for(int i = ii.getAndIncrement(); i < arr.length; i = ii.getAndIncrement()) {
            arr[i] = (long)i*i;
            cnt.getAndIncrement();
        }
    }
}
Used 1 threads, cnt is 1000000
Used 2 threads, cnt is 1000000
Used 3 threads, cnt is 1000000
Used 4 threads, cnt is 1000000
Used 5 threads, cnt is 1000000
Used 6 threads, cnt is 1000000
Used 7 threads, cnt is 1000000
Used 8 threads, cnt is 1000000
Used 9 threads, cnt is 1000000
Used 10 threads, cnt is 1000000
Used 11 threads, cnt is 1000000
Used 12 threads, cnt is 1000000
Used 13 threads, cnt is 1000000
Used 14 threads, cnt is 1000000
Used 15 threads, cnt is 1000000
Used 16 threads, cnt is 1000000

请注意,复杂的并行处理框架不使用此类原子索引更新,而是在启动线程之前根据预期的目标并行度将范围分成大小相等的子范围,因此每个线程都可以循环其预先分配的范围使用普通的局部索引变量。

使用 Arrays.parallelSetAll(arr, i -> (long)i*i); 或 Stream API 时免费获得。