Java 8: ArrayDeque<>.poll 在并行环境中返回 null

Java 8: ArrayDeque<>.poll returning null in parallel environment

我试图在多个线程中维护一个项目列表,每个线程一个(例如,每个线程一个套接字连接)。我在 ArrayDeque<> 中维护这个列表。我面临的问题是 ArrayDeque<> 超过了项目的数量。线程池中的线程数。

这是我的代码:

package com.practice;

import java.util.ArrayDeque;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureApp implements AutoCloseable {
    private static void sleep(long timeMS) {
        try {
            Thread.sleep(timeMS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try (CompletableFutureApp completableFutureApp = new CompletableFutureApp()) {

            Runnable[] tasksList1 = new Runnable[100];

            for (int i = 0; i < tasksList1.length; i++) {
                String msg1 = "TaskList 1 no.: " + i;

                tasksList1[i] = () -> {
                    //System.out.println(msg1);
                    sleep(300);
                };
            }

            completableFutureApp
                    .executeTasks(tasksList1)
                    .thenRun(() -> System.out.println("All tasks completed successfully"));
        }
    }

    private ExecutorService threadPool = Executors.newWorkStealingPool(10);
    private ArrayDeque<Integer> itemsAvailable = new ArrayDeque<>();

    private CompletableFuture executeTasks(Runnable... tasks) {
        CompletableFuture[] futures = new CompletableFuture[tasks.length];

        for (int i = 0; i < tasks.length; i++) {
            Runnable task = tasks[i];

            futures[i] = CompletableFuture.runAsync(() -> {
                Integer item = itemsAvailable.poll();
                if (item == null)
                    item = new Random().nextInt();

                task.run();

                itemsAvailable.add(item);

                System.out.println("Items available: " + itemsAvailable.size());
            }, threadPool);
        }

        return CompletableFuture.allOf(futures).thenRun(() -> System.out.println("Items available at last: " + itemsAvailable.size()));
    }

    @Override
    public void close() {
        threadPool.shutdown();
        try {
            threadPool.awaitTermination(100, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在我的代码中,我正在创建一个 大小为 10 的工作窃取池,谁能告诉我为什么没有。在这种情况下 ArrayDeque<> 中的元素数量超过 10?

From the Javadoc:

[Array deques] are not thread-safe; in the absence of external synchronization, they do not support concurrent access by multiple threads.

使用外部同步,或select一个不同的class,它是专门为并发访问而设计的。