如何将 List<LinkedBlockingQueue<Long>> 提交给 ThreadPoolExecutor 并且每个线程将选择一个 LinkedBlockingQueue 并并行执行
How to submit List<LinkedBlockingQueue<Long>> to ThreadPoolExecutor and each thread will pick one LinkedBlockingQueue and execute it parallel
我将 Long 类型的 LinkedBlockingQueue 列表提交给 ThreadPoolExecutor,条件应该是每个线程选择 long 的 LinkedBlockingQueue 并并行执行
这是我的方法逻辑
public void doParallelProcess() {
List<LinkedBlockingQueue<Long>> linkedBlockingQueueList = splitListtoBlockingQueues();
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(), 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
Long initial = System.currentTimeMillis();
try {
System.out.println("linkedBlockingQueueList begin size is " + linkedBlockingQueueList.size() + "is empty"
+ linkedBlockingQueueList.isEmpty());
while (true) {
linkedBlockingQueueList.parallelStream().parallel().filter(q -> !q.isEmpty()).forEach(queue -> {
Long id = queue.poll();
MyTestRunnable runnab = new MyTestRunnable(id);
executor.execute(runnab);
System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
+ executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
});
System.out.println("linkedBlockingQueueList end size is " + linkedBlockingQueueList.size() + "is empty"
+ linkedBlockingQueueList.isEmpty());
System.out.println("executor service " + executor);
if (executor.getCompletedTaskCount() == (long) mainList.size()) {
break;
}
while (executor.getActiveCount() != 0) {
System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
+ executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
Thread.sleep(1000L);
}
}
} catch (Exception e) {
} finally {
executor.shutdown();
while (!executor.isTerminated()) {
}
}
} `
如何将 LinkedBlockingQueue 列表提交给单个线程
示例:
List<LinkedBlockingQueue<Long>>
每个 LinkedBlockingQueue
包含 50 个队列数据
- 大小
List<LinkedBlockingQueue<Long>>
是 50
- 每个线程应该选择一个
LinkedBlockingQueue<Long>
并执行50个队列
任务。
Executors
class 是线程池的主要入口:
ExecutorService executor = Executors.newCachedThreadPool();
linkedBlockingQueueList.forEach(queue -> executor.submit(() -> { /* process queue */ }));
如果您确实想自己创建一个 ThreadPoolExecutor
— 它确实可以让您更好地控制配置 — 至少有两种方法可以指定默认线程工厂:
省略线程工厂参数:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(),
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
再次使用 Executors
class 获取默认线程工厂:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(),
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory());
ExecutorService
的输入是 Runnable
或 Callable
。您提交的任何任务都需要实现这两个接口之一。如果您想将一堆任务提交到线程池并等待它们全部完成,那么您可以使用 invokeAll method and loop over the resulting Future
s, calling get
on each: see this informative answer 来解决类似的问题。
不过,您不需要将输入任务分组。您永远不希望执行程序服务在仍有工作要做时有空闲线程!您希望它能够在资源释放后立即抓取下一个任务,而以这种方式进行批处理与此相反。您的代码是这样做的:
while non-empty input lists exist {
for each non-empty input list L {
t = new Runnable(L.pop())
executor.submit(t)
}
while (executor.hasTasks()) {
wait
}
}
其中一项任务完成后,该线程应该可以自由地转到其他工作。但不会,因为您等到所有 N 个任务都完成后再提交。使用 invokeAll
一次提交它们,让执行程序服务执行其构建的任务。
我将 Long 类型的 LinkedBlockingQueue 列表提交给 ThreadPoolExecutor,条件应该是每个线程选择 long 的 LinkedBlockingQueue 并并行执行
这是我的方法逻辑
public void doParallelProcess() {
List<LinkedBlockingQueue<Long>> linkedBlockingQueueList = splitListtoBlockingQueues();
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(), 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
Long initial = System.currentTimeMillis();
try {
System.out.println("linkedBlockingQueueList begin size is " + linkedBlockingQueueList.size() + "is empty"
+ linkedBlockingQueueList.isEmpty());
while (true) {
linkedBlockingQueueList.parallelStream().parallel().filter(q -> !q.isEmpty()).forEach(queue -> {
Long id = queue.poll();
MyTestRunnable runnab = new MyTestRunnable(id);
executor.execute(runnab);
System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
+ executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
});
System.out.println("linkedBlockingQueueList end size is " + linkedBlockingQueueList.size() + "is empty"
+ linkedBlockingQueueList.isEmpty());
System.out.println("executor service " + executor);
if (executor.getCompletedTaskCount() == (long) mainList.size()) {
break;
}
while (executor.getActiveCount() != 0) {
System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
+ executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
Thread.sleep(1000L);
}
}
} catch (Exception e) {
} finally {
executor.shutdown();
while (!executor.isTerminated()) {
}
}
} `
如何将 LinkedBlockingQueue 列表提交给单个线程 示例:
List<LinkedBlockingQueue<Long>>
每个 LinkedBlockingQueue 包含 50 个队列数据- 大小
List<LinkedBlockingQueue<Long>>
是 50 - 每个线程应该选择一个
LinkedBlockingQueue<Long>
并执行50个队列 任务。
Executors
class 是线程池的主要入口:
ExecutorService executor = Executors.newCachedThreadPool();
linkedBlockingQueueList.forEach(queue -> executor.submit(() -> { /* process queue */ }));
如果您确实想自己创建一个 ThreadPoolExecutor
— 它确实可以让您更好地控制配置 — 至少有两种方法可以指定默认线程工厂:
省略线程工厂参数:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
再次使用
Executors
class 获取默认线程工厂:ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
ExecutorService
的输入是 Runnable
或 Callable
。您提交的任何任务都需要实现这两个接口之一。如果您想将一堆任务提交到线程池并等待它们全部完成,那么您可以使用 invokeAll method and loop over the resulting Future
s, calling get
on each: see this informative answer 来解决类似的问题。
不过,您不需要将输入任务分组。您永远不希望执行程序服务在仍有工作要做时有空闲线程!您希望它能够在资源释放后立即抓取下一个任务,而以这种方式进行批处理与此相反。您的代码是这样做的:
while non-empty input lists exist {
for each non-empty input list L {
t = new Runnable(L.pop())
executor.submit(t)
}
while (executor.hasTasks()) {
wait
}
}
其中一项任务完成后,该线程应该可以自由地转到其他工作。但不会,因为您等到所有 N 个任务都完成后再提交。使用 invokeAll
一次提交它们,让执行程序服务执行其构建的任务。