当线程池大小小于执行的任务数时,使用 newFixedThreadPool 的多线程程序不会 运行 作为例外

Multi threaded program using newFixedThreadPool doesn't run as excepted when the thread pool size is less than the number of tasks executed

    package com.playground.concurrency;

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;

    public class MyRunnable implements Runnable {
        private String taskName;

        public String getTaskName() {
            return taskName;
        }

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }

        private int processed = 0;

        public MyRunnable(String name) {
            this.taskName = name;
        }

        private boolean keepRunning = true;

        public boolean isKeepRunning() {
            return keepRunning;
        }

        public void setKeepRunning(boolean keepRunning) {
            this.keepRunning = keepRunning;
        }

        private BlockingQueue<Integer> elements = new LinkedBlockingQueue<Integer>(10);

        public BlockingQueue<Integer> getElements() {
            return elements;
        }

        public void setElements(BlockingQueue<Integer> elements) {
            this.elements = elements;
        }

        @Override
        public void run() {
            while (keepRunning || !elements.isEmpty()) {
                try {
                    Integer element = elements.take();
                    Thread.sleep(10);
                    System.out.println(taskName +" :: "+elements.size());
                    System.out.println("Got :: " + element);
                    processed++;
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
            System.out.println("Exiting thread");

        }

        public int getProcessed() {
            return processed;
        }

        public void setProcessed(int processed) {
            this.processed = processed;
        }

    }

package com.playground.concurrency.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.playground.concurrency.MyRunnable;

public class TestService {
    public static void main(String[] args) throws InterruptedException {
        int roundRobinIndex = 0;
        int noOfProcess = 10;
        List<MyRunnable> processes = new ArrayList<MyRunnable>();
        for (int i = 0; i < noOfProcess; i++) {
            processes.add(new MyRunnable("Task : " + i));
        }
        ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(5);

        for (MyRunnable process : processes) {
            threadPoolExecutor.execute(process);
        }
        int totalMessages = 1000;
        long start = System.currentTimeMillis();
        for (int i = 1; i <= totalMessages; i++) {
            processes.get(roundRobinIndex++).getElements().put(i);
            if (roundRobinIndex == noOfProcess) {
                roundRobinIndex = 0;
            }
        }
        System.out.println("Done putting all the elements");
        for (MyRunnable process : processes) {
            process.setKeepRunning(false);
        }
        threadPoolExecutor.shutdown();
        try {
            threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long totalProcessed = 0;
        for (MyRunnable process : processes) {
            System.out.println("task " + process.getTaskName() + " processd " + process.getProcessed());
            totalProcessed += process.getProcessed();
        }
        long end = System.currentTimeMillis();
        System.out.println("total time" + (end - start));

    }

}

我有一个从 LinkedBlockingQueue 中读取元素的简单任务。我创建了这些任务的多个实例并按 ExecutorService 执行。当 noOfProcess 和线程池大小相同时,此程序按预期工作。(例如:noOfProcess=10thread pool size=10)。

但是,如果 noOfProcess=10thread pool size =5 则主线程在处理了一些项目后继续在下面的行等待。

processes.get(roundRobinIndex++).getElements().put(i);

我做错了什么?

要修复该行为,您需要执行以下操作之一:

  • 有足够的 Runnable,每个都有足够的队列容量来接收所有 1,000 条消息(例如:100 个容量为 10 或以上的 Runnable;或 10 个容量为 100 或以上的 Runnable),或
  • 有一个足够大的线程池来容纳所有的 Runnables,这样每个 Runnables 都可以启动 运行ning。

如果其中之一没有发生,ExecutorService 将不会启动额外的 Runnable。主工作线程将继续向每个队列添加项目,包括非 运行ning Runnables 的项目,直到它遇到一个已满的队列,此时它会阻塞。对于 10 个 Runnable 和线程池大小 5,第一个填满的队列将是第 6 个 Runnable。如果您只有 6 个 Runnable,这也是一样的。重要的一点是,您的 Runnable 至少比线程池中的空间多一个。

来自newFixedThreadPool() Javadoc

If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

考虑一个包含 2 个进程和 1 的线程池大小的更简单示例。您将被允许创建第一个进程并将其提交给 ExecutorService(因此 ExecutorService 将启动并 运行 它)。但是,ExecutorService 不允许第二个进程 运行。但是,您的主线程不会注意这一点,它会继续将元素放入第二个进程的队列中,即使没有任何东西在消耗它。

您的代码适用于 noOfProcess=10thread pool size=5 – 如果您还将队列大小更改为 100,如下所示:new LinkedBlockingQueue<>(100).

您可以观察到这种行为——非运行ning Runnable 的队列已满——如果您更改此行:

processes.get(roundRobinIndex++).getElements().put(i);

对此(这是相同的逻辑代码,但保存了对象引用以供在 println() 输出中使用):

MyRunnable runnable = processes.get(roundRobinIndex++);
BlockingQueue<Integer> elements = runnable.getElements();
System.out.println("attempt to put() for " + runnable.getTaskName() + " with " + elements.size() + " elements");
elements.put(i);

啊是的。好旧的僵局。

发生的情况是:您向 ExecutorService 提交 10 个任务,然后通过 .put(i) 发送作业。当任务 5 的队列已满时,这会按预期阻塞任务 5。现在任务 5 目前没有被执行,事实上永远不会被执行,因为任务 0 到 4 仍在阻塞你的 FixedThreadPool,阻塞在 run() 方法中的 .take() 等待新的工作来自 .put(i),他们永远也得不到。

这个错误是您代码中的一个基本设计缺陷,有无数种方法可以修复它,其中之一就是增加线程池大小。

我的建议是你回到绘图板重新考虑主要方法中的结构。

既然你发布了你的代码,有一些提示:

1.: 发布你的整个代码可以被解释为调用 'pls fix my code',我们鼓励你省略所有不必要的细节(比如所有那些 getter 和 setter)。也许检查 https://whosebug.com/help/minimal-reproducible-example

2.: 在同一个正文中发布两个 类 让事情有点复杂。下次分吧。

3.:(吹毛求疵) processes.get(roundRobinIndex++).getElements().put(i); 像你在这里做的那样组合两个操作是不好的风格,因为它会使你的代码对其他人来说更难阅读。你可以只写: processes.get(i % noOfProcesses).getElements().put(i);