哪种线程机制用于将其他任务排入队列的任务?
Which threading mechanism to use for tasks that enqueue other tasks?
我正在使用创建其他任务的任务。这些任务可能会或可能不会创建后续任务。我事先不知道总共会创建多少个任务。在某个时候,不再创建任何任务,所有任务都将完成。
当最后一个任务完成后,我必须做一些额外的事情。
应该使用哪种线程机制?我读过 CountDownLatch, Cyclic Barrier and Phaser,但 none 似乎合适。
我也尝试过使用ExecutorService,但是我遇到了一些问题,比如最后无法执行某些东西,你可以在下面看到我的尝试:
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class Issue {
public static void main(String[] args) throws InterruptedException {
var count = new AtomicInteger(1);
var executor = Executors.newFixedThreadPool(3);
class Task implements Runnable {
final int id = count.getAndIncrement();
@Override
public void run() {
try {
MILLISECONDS.sleep((long)(Math.random() * 1000L + 1000L));
} catch (InterruptedException e) {
// Do nothing
}
if (id < 5) {
executor.submit(new Task());
executor.submit(new Task());
}
System.out.println(id);
}
}
executor.execute(new Task());
executor.shutdown();
// executor.awaitTermination(20, TimeUnit.SECONDS);
System.out.println("Hello");
}
}
这会输出异常,因为任务是在调用 shutdown()
之后添加的,但预期输出类似于:
1
2
3
4
5
6
7
8
9
Hello
哪种线程机制可以帮助我做到这一点?
这似乎很棘手。如果甚至有一个任务在队列中或当前正在执行,那么由于您不能说它是否会产生另一个任务,因此您无法知道它可能 运行 多长时间。这可能是需要另外 2 小时的一系列任务的开始。
我认为您实现此目标所需的所有信息都已封装在执行程序实现中。您需要知道 运行ning 和队列中的内容。
我认为您很遗憾地考虑必须编写自己的执行程序。它不需要很复杂,如果你不想的话,它也不必符合 JDK 的接口。只是维护线程池和任务队列的东西。添加将侦听器附加到执行程序的功能。当队列为空并且没有正在执行的任务时,您可以通知监听器。
这是一个快速代码草图。
class MyExecutor
{
private final AtomicLong taskId = new AtomicLong();
private final Map<Long, Runnable> idToQueuedTask = new ConcurrentHashMap<>();
private final AtomicLong runningTasks = new AtomicLong();
private final ExecutorService delegate = Executors.newFixedThreadPool(3);
public void submit(Runnable task) {
long id = taskId.incrementAndGet();
final Runnable wrapped = () -> {
taskStarted(id);
try {
task.run();
}
finally {
taskEnded();
}
};
idToQueuedTask.put(id, wrapped);
delegate.submit(wrapped);
}
private void taskStarted(long id) {
idToQueuedTask.remove(id);
runningTasks.incrementAndGet();
}
private void taskEnded() {
final long numRunning = runningTasks.decrementAndGet();
if (numRunning == 0 && idToQueuedTask.isEmpty()) {
System.out.println("Done, time to notify listeners");
}
}
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
executor.submit(() -> {
System.out.println("Parent task");
try {
Thread.sleep(1000);
}
catch (Exception e) {}
executor.submit(() -> {
System.out.println("Child task");
});
});
}
}
如果您将 ExecutorService
更改为:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
然后您可以使用计数函数等待:
while(executor.getTaskCount() > executor.getCompletedTaskCount())
{
TimeUnit.SECONDS.sleep(10L);
}
executor.shutdown();
System.out.println("Hello");
我正在使用创建其他任务的任务。这些任务可能会或可能不会创建后续任务。我事先不知道总共会创建多少个任务。在某个时候,不再创建任何任务,所有任务都将完成。
当最后一个任务完成后,我必须做一些额外的事情。
应该使用哪种线程机制?我读过 CountDownLatch, Cyclic Barrier and Phaser,但 none 似乎合适。
我也尝试过使用ExecutorService,但是我遇到了一些问题,比如最后无法执行某些东西,你可以在下面看到我的尝试:
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class Issue {
public static void main(String[] args) throws InterruptedException {
var count = new AtomicInteger(1);
var executor = Executors.newFixedThreadPool(3);
class Task implements Runnable {
final int id = count.getAndIncrement();
@Override
public void run() {
try {
MILLISECONDS.sleep((long)(Math.random() * 1000L + 1000L));
} catch (InterruptedException e) {
// Do nothing
}
if (id < 5) {
executor.submit(new Task());
executor.submit(new Task());
}
System.out.println(id);
}
}
executor.execute(new Task());
executor.shutdown();
// executor.awaitTermination(20, TimeUnit.SECONDS);
System.out.println("Hello");
}
}
这会输出异常,因为任务是在调用 shutdown()
之后添加的,但预期输出类似于:
1
2
3
4
5
6
7
8
9
Hello
哪种线程机制可以帮助我做到这一点?
这似乎很棘手。如果甚至有一个任务在队列中或当前正在执行,那么由于您不能说它是否会产生另一个任务,因此您无法知道它可能 运行 多长时间。这可能是需要另外 2 小时的一系列任务的开始。
我认为您实现此目标所需的所有信息都已封装在执行程序实现中。您需要知道 运行ning 和队列中的内容。
我认为您很遗憾地考虑必须编写自己的执行程序。它不需要很复杂,如果你不想的话,它也不必符合 JDK 的接口。只是维护线程池和任务队列的东西。添加将侦听器附加到执行程序的功能。当队列为空并且没有正在执行的任务时,您可以通知监听器。
这是一个快速代码草图。
class MyExecutor
{
private final AtomicLong taskId = new AtomicLong();
private final Map<Long, Runnable> idToQueuedTask = new ConcurrentHashMap<>();
private final AtomicLong runningTasks = new AtomicLong();
private final ExecutorService delegate = Executors.newFixedThreadPool(3);
public void submit(Runnable task) {
long id = taskId.incrementAndGet();
final Runnable wrapped = () -> {
taskStarted(id);
try {
task.run();
}
finally {
taskEnded();
}
};
idToQueuedTask.put(id, wrapped);
delegate.submit(wrapped);
}
private void taskStarted(long id) {
idToQueuedTask.remove(id);
runningTasks.incrementAndGet();
}
private void taskEnded() {
final long numRunning = runningTasks.decrementAndGet();
if (numRunning == 0 && idToQueuedTask.isEmpty()) {
System.out.println("Done, time to notify listeners");
}
}
public static void main(String[] args) {
MyExecutor executor = new MyExecutor();
executor.submit(() -> {
System.out.println("Parent task");
try {
Thread.sleep(1000);
}
catch (Exception e) {}
executor.submit(() -> {
System.out.println("Child task");
});
});
}
}
如果您将 ExecutorService
更改为:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
然后您可以使用计数函数等待:
while(executor.getTaskCount() > executor.getCompletedTaskCount())
{
TimeUnit.SECONDS.sleep(10L);
}
executor.shutdown();
System.out.println("Hello");