当线程池大小小于执行的任务数时,使用 newFixedThreadPool 的多线程程序不会 运行 作为例外
Multi threaded program using newFixedThreadPool doesn't run as excepted when the thread pool size is less than the number of tasks executed
package com.playground.concurrency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MyRunnable implements Runnable {
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
private int processed = 0;
public MyRunnable(String name) {
this.taskName = name;
}
private boolean keepRunning = true;
public boolean isKeepRunning() {
return keepRunning;
}
public void setKeepRunning(boolean keepRunning) {
this.keepRunning = keepRunning;
}
private BlockingQueue<Integer> elements = new LinkedBlockingQueue<Integer>(10);
public BlockingQueue<Integer> getElements() {
return elements;
}
public void setElements(BlockingQueue<Integer> elements) {
this.elements = elements;
}
@Override
public void run() {
while (keepRunning || !elements.isEmpty()) {
try {
Integer element = elements.take();
Thread.sleep(10);
System.out.println(taskName +" :: "+elements.size());
System.out.println("Got :: " + element);
processed++;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Exiting thread");
}
public int getProcessed() {
return processed;
}
public void setProcessed(int processed) {
this.processed = processed;
}
}
package com.playground.concurrency.service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.playground.concurrency.MyRunnable;
public class TestService {
public static void main(String[] args) throws InterruptedException {
int roundRobinIndex = 0;
int noOfProcess = 10;
List<MyRunnable> processes = new ArrayList<MyRunnable>();
for (int i = 0; i < noOfProcess; i++) {
processes.add(new MyRunnable("Task : " + i));
}
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(5);
for (MyRunnable process : processes) {
threadPoolExecutor.execute(process);
}
int totalMessages = 1000;
long start = System.currentTimeMillis();
for (int i = 1; i <= totalMessages; i++) {
processes.get(roundRobinIndex++).getElements().put(i);
if (roundRobinIndex == noOfProcess) {
roundRobinIndex = 0;
}
}
System.out.println("Done putting all the elements");
for (MyRunnable process : processes) {
process.setKeepRunning(false);
}
threadPoolExecutor.shutdown();
try {
threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
long totalProcessed = 0;
for (MyRunnable process : processes) {
System.out.println("task " + process.getTaskName() + " processd " + process.getProcessed());
totalProcessed += process.getProcessed();
}
long end = System.currentTimeMillis();
System.out.println("total time" + (end - start));
}
}
我有一个从 LinkedBlockingQueue
中读取元素的简单任务。我创建了这些任务的多个实例并按 ExecutorService
执行。当 noOfProcess
和线程池大小相同时,此程序按预期工作。(例如:noOfProcess=10
和 thread pool size=10
)。
但是,如果 noOfProcess=10
和 thread pool size =5
则主线程在处理了一些项目后继续在下面的行等待。
processes.get(roundRobinIndex++).getElements().put(i);
我做错了什么?
要修复该行为,您需要执行以下操作之一:
- 有足够的 Runnable,每个都有足够的队列容量来接收所有 1,000 条消息(例如:100 个容量为 10 或以上的 Runnable;或 10 个容量为 100 或以上的 Runnable),或
- 有一个足够大的线程池来容纳所有的 Runnables,这样每个 Runnables 都可以启动 运行ning。
如果其中之一没有发生,ExecutorService 将不会启动额外的 Runnable。主工作线程将继续向每个队列添加项目,包括非 运行ning Runnables 的项目,直到它遇到一个已满的队列,此时它会阻塞。对于 10 个 Runnable 和线程池大小 5,第一个填满的队列将是第 6 个 Runnable。如果您只有 6 个 Runnable,这也是一样的。重要的一点是,您的 Runnable 至少比线程池中的空间多一个。
来自newFixedThreadPool() Javadoc:
If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
考虑一个包含 2 个进程和 1 的线程池大小的更简单示例。您将被允许创建第一个进程并将其提交给 ExecutorService(因此 ExecutorService 将启动并 运行 它)。但是,ExecutorService 不允许第二个进程 运行。但是,您的主线程不会注意这一点,它会继续将元素放入第二个进程的队列中,即使没有任何东西在消耗它。
您的代码适用于 noOfProcess=10
和 thread pool size=5
– 如果您还将队列大小更改为 100,如下所示:new LinkedBlockingQueue<>(100)
.
您可以观察到这种行为——非运行ning Runnable 的队列已满——如果您更改此行:
processes.get(roundRobinIndex++).getElements().put(i);
对此(这是相同的逻辑代码,但保存了对象引用以供在 println()
输出中使用):
MyRunnable runnable = processes.get(roundRobinIndex++);
BlockingQueue<Integer> elements = runnable.getElements();
System.out.println("attempt to put() for " + runnable.getTaskName() + " with " + elements.size() + " elements");
elements.put(i);
啊是的。好旧的僵局。
发生的情况是:您向 ExecutorService 提交 10 个任务,然后通过 .put(i)
发送作业。当任务 5 的队列已满时,这会按预期阻塞任务 5。现在任务 5 目前没有被执行,事实上永远不会被执行,因为任务 0 到 4 仍在阻塞你的 FixedThreadPool,阻塞在 run()
方法中的 .take()
等待新的工作来自 .put(i)
,他们永远也得不到。
这个错误是您代码中的一个基本设计缺陷,有无数种方法可以修复它,其中之一就是增加线程池大小。
我的建议是你回到绘图板重新考虑主要方法中的结构。
既然你发布了你的代码,有一些提示:
1.:
发布你的整个代码可以被解释为调用 'pls fix my code',我们鼓励你省略所有不必要的细节(比如所有那些 getter 和 setter)。也许检查 https://whosebug.com/help/minimal-reproducible-example
2.:
在同一个正文中发布两个 类 让事情有点复杂。下次分吧。
3.:(吹毛求疵)
processes.get(roundRobinIndex++).getElements().put(i);
像你在这里做的那样组合两个操作是不好的风格,因为它会使你的代码对其他人来说更难阅读。你可以只写:
processes.get(i % noOfProcesses).getElements().put(i);
package com.playground.concurrency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MyRunnable implements Runnable {
private String taskName;
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
private int processed = 0;
public MyRunnable(String name) {
this.taskName = name;
}
private boolean keepRunning = true;
public boolean isKeepRunning() {
return keepRunning;
}
public void setKeepRunning(boolean keepRunning) {
this.keepRunning = keepRunning;
}
private BlockingQueue<Integer> elements = new LinkedBlockingQueue<Integer>(10);
public BlockingQueue<Integer> getElements() {
return elements;
}
public void setElements(BlockingQueue<Integer> elements) {
this.elements = elements;
}
@Override
public void run() {
while (keepRunning || !elements.isEmpty()) {
try {
Integer element = elements.take();
Thread.sleep(10);
System.out.println(taskName +" :: "+elements.size());
System.out.println("Got :: " + element);
processed++;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Exiting thread");
}
public int getProcessed() {
return processed;
}
public void setProcessed(int processed) {
this.processed = processed;
}
}
package com.playground.concurrency.service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.playground.concurrency.MyRunnable;
public class TestService {
public static void main(String[] args) throws InterruptedException {
int roundRobinIndex = 0;
int noOfProcess = 10;
List<MyRunnable> processes = new ArrayList<MyRunnable>();
for (int i = 0; i < noOfProcess; i++) {
processes.add(new MyRunnable("Task : " + i));
}
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(5);
for (MyRunnable process : processes) {
threadPoolExecutor.execute(process);
}
int totalMessages = 1000;
long start = System.currentTimeMillis();
for (int i = 1; i <= totalMessages; i++) {
processes.get(roundRobinIndex++).getElements().put(i);
if (roundRobinIndex == noOfProcess) {
roundRobinIndex = 0;
}
}
System.out.println("Done putting all the elements");
for (MyRunnable process : processes) {
process.setKeepRunning(false);
}
threadPoolExecutor.shutdown();
try {
threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
long totalProcessed = 0;
for (MyRunnable process : processes) {
System.out.println("task " + process.getTaskName() + " processd " + process.getProcessed());
totalProcessed += process.getProcessed();
}
long end = System.currentTimeMillis();
System.out.println("total time" + (end - start));
}
}
我有一个从 LinkedBlockingQueue
中读取元素的简单任务。我创建了这些任务的多个实例并按 ExecutorService
执行。当 noOfProcess
和线程池大小相同时,此程序按预期工作。(例如:noOfProcess=10
和 thread pool size=10
)。
但是,如果 noOfProcess=10
和 thread pool size =5
则主线程在处理了一些项目后继续在下面的行等待。
processes.get(roundRobinIndex++).getElements().put(i);
我做错了什么?
要修复该行为,您需要执行以下操作之一:
- 有足够的 Runnable,每个都有足够的队列容量来接收所有 1,000 条消息(例如:100 个容量为 10 或以上的 Runnable;或 10 个容量为 100 或以上的 Runnable),或
- 有一个足够大的线程池来容纳所有的 Runnables,这样每个 Runnables 都可以启动 运行ning。
如果其中之一没有发生,ExecutorService 将不会启动额外的 Runnable。主工作线程将继续向每个队列添加项目,包括非 运行ning Runnables 的项目,直到它遇到一个已满的队列,此时它会阻塞。对于 10 个 Runnable 和线程池大小 5,第一个填满的队列将是第 6 个 Runnable。如果您只有 6 个 Runnable,这也是一样的。重要的一点是,您的 Runnable 至少比线程池中的空间多一个。
来自newFixedThreadPool() Javadoc:
If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
考虑一个包含 2 个进程和 1 的线程池大小的更简单示例。您将被允许创建第一个进程并将其提交给 ExecutorService(因此 ExecutorService 将启动并 运行 它)。但是,ExecutorService 不允许第二个进程 运行。但是,您的主线程不会注意这一点,它会继续将元素放入第二个进程的队列中,即使没有任何东西在消耗它。
您的代码适用于 noOfProcess=10
和 thread pool size=5
– 如果您还将队列大小更改为 100,如下所示:new LinkedBlockingQueue<>(100)
.
您可以观察到这种行为——非运行ning Runnable 的队列已满——如果您更改此行:
processes.get(roundRobinIndex++).getElements().put(i);
对此(这是相同的逻辑代码,但保存了对象引用以供在 println()
输出中使用):
MyRunnable runnable = processes.get(roundRobinIndex++);
BlockingQueue<Integer> elements = runnable.getElements();
System.out.println("attempt to put() for " + runnable.getTaskName() + " with " + elements.size() + " elements");
elements.put(i);
啊是的。好旧的僵局。
发生的情况是:您向 ExecutorService 提交 10 个任务,然后通过 .put(i)
发送作业。当任务 5 的队列已满时,这会按预期阻塞任务 5。现在任务 5 目前没有被执行,事实上永远不会被执行,因为任务 0 到 4 仍在阻塞你的 FixedThreadPool,阻塞在 run()
方法中的 .take()
等待新的工作来自 .put(i)
,他们永远也得不到。
这个错误是您代码中的一个基本设计缺陷,有无数种方法可以修复它,其中之一就是增加线程池大小。
我的建议是你回到绘图板重新考虑主要方法中的结构。
既然你发布了你的代码,有一些提示:
1.: 发布你的整个代码可以被解释为调用 'pls fix my code',我们鼓励你省略所有不必要的细节(比如所有那些 getter 和 setter)。也许检查 https://whosebug.com/help/minimal-reproducible-example
2.: 在同一个正文中发布两个 类 让事情有点复杂。下次分吧。
3.:(吹毛求疵)
processes.get(roundRobinIndex++).getElements().put(i);
像你在这里做的那样组合两个操作是不好的风格,因为它会使你的代码对其他人来说更难阅读。你可以只写:
processes.get(i % noOfProcesses).getElements().put(i);