固定线程池:暂停直到线程可用?
Fixed thread pool: pause until thread available?
我正在开发一个基于 Java 的程序,该程序从文件中读取并将每一行发送到其自己的处理 Runnable 中以执行一些单线程计算。我正在使用一个固定的线程池,每个可用核心一个任务,来并行化它。该文件很大,提交作业时不可能将每个文件都加载到内存中。是否可以让主线程(正在提交这些任务)暂停,直到线程池中的线程可用?
创建一个简单的线程池,其中包含与池相同的可用工作线程size.before 提交检查是否存在可用线程,然后提交否则等待锁定。
您也可以使用信号量,它会阻塞直到 acquire() 获得一些值。
信号量示例:
Semaphore semaphore = new Semaphore(pool_size);
//critical section
semaphore.acquire();
...
semaphore.release();
简单线程池示例:
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
提交方式
public boolean submit(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (Lock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
Lock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
Lock.notifyAll();
}
return true;
}
我正在开发一个基于 Java 的程序,该程序从文件中读取并将每一行发送到其自己的处理 Runnable 中以执行一些单线程计算。我正在使用一个固定的线程池,每个可用核心一个任务,来并行化它。该文件很大,提交作业时不可能将每个文件都加载到内存中。是否可以让主线程(正在提交这些任务)暂停,直到线程池中的线程可用?
创建一个简单的线程池,其中包含与池相同的可用工作线程size.before 提交检查是否存在可用线程,然后提交否则等待锁定。
您也可以使用信号量,它会阻塞直到 acquire() 获得一些值。 信号量示例:
Semaphore semaphore = new Semaphore(pool_size);
//critical section
semaphore.acquire();
...
semaphore.release();
简单线程池示例:
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
提交方式
public boolean submit(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (Lock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
Lock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
Lock.notifyAll();
}
return true;
}