Java 中生产者-消费者更好的成语是什么?
What is a better idiom for producer-consumers in Java?
我想逐行读取文件,对每一行做一些可以轻松并行完成的缓慢操作,然后将结果逐行写入文件。我不关心输出的顺序。输入和输出太大,内存放不下。我希望能够同时对线程数 运行 以及内存中的行数设置硬限制。
我用于文件 IO 的库 (Apache Commons CSV) 似乎不提供同步文件访问,因此我认为我无法同时从多个线程读取或写入同一文件。如果可能的话,我会创建一个 ThreadPoolExecutor 并为每一行提供一个任务,它会简单地读取该行,执行计算并写入结果。
相反,我认为我需要的是一个执行解析的单线程、一个用于解析输入行的有界队列、一个包含执行计算的作业的线程池、一个用于计算输出行的有界队列,以及一个写的线程。一个生产者,很多消费者-生产者和一个消费者,如果这是有道理的话。
我得到的是这样的:
BlockingQueue<CSVRecord> inputQueue = new ArrayBlockingQueue<CSVRecord>(INPUT_QUEUE_SIZE);
BlockingQueue<String[]> outputQueue = new ArrayBlockingQueue<String[]>(OUTPUT_QUEUE_SIZE);
Thread parserThread = new Thread(() -> {
while (inputFileIterator.hasNext()) {
CSVRecord record = inputFileIterator.next();
parsedQueue.put(record); // blocks if queue is full
}
});
// the job queue of the thread pool has to be bounded too, otherwise all
// the objects in the input queue will be given to jobs immediately and
// I'll run out of heap space
// source:
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
Thread processingBossThread = new Thread(() -> {
while (!inputQueue.isEmpty() || parserThread.isAlive()) {
CSVRecord record = inputQueue.take(); // blocks if queue is empty
executorService.execute(() -> {
String[] array = this.doStuff(record);
outputQueue.put(array); // blocks if queue is full
});
}
// getting here that means that all CSV rows have been read and
// added to the processing queue
executorService.shutdown(); // do not accept any new tasks
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
// wait for existing tasks to finish
});
Thread writerThread = new Thread(() -> {
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
String[] outputRow = outputQueue.take(); // blocks if queue is empty
outputFileWriter.printRecord((Object[]) outputRow);
});
parserThread.start();
consumerBossThread.start();
writerThread.start();
// wait until writer thread has finished
writerThread.join();
我省略了日志记录和异常处理,因此看起来比实际要短得多。
此解决方案有效,但我对此并不满意。必须创建我自己的线程,检查它们的 isAlive(),在 Runnable 中创建一个 Runnable,当我真的只想等到所有 worker 都完成时被迫指定一个超时等似乎很老套。总而言之一个 100 多行的方法,如果我让 Runnables 成为它们自己的 类,甚至是几百行代码,这似乎是一个非常基本的模式。
有更好的解决办法吗?我想尽可能多地使用 Java 的库,以帮助保持我的代码可维护并符合最佳实践。我仍然想知道它在幕后做了什么,但我怀疑自己实现所有这些是最好的方法。
更新:
更好的解决方案,根据答案的建议:
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
while (it.hasNext()) {
CSVRecord record = it.next();
executorService.execute(() -> {
String[] array = this.doStuff(record);
synchronized (writer) {
writer.printRecord((Object[]) array);
}
});
}
我想先指出一点,我可以想到三种可能的情况:
1.-对于一个文件的所有行,使用doStuff方法处理一行所需的时间大于从磁盘读取同一行并解析它所花费的时间
2.- 对于文件的所有行,使用doStuff方法处理一行所需的时间低于或等于读取同一行并解析它所花费的时间.
3.- 同一个文件的第一个和第二个场景都不是。
您的解决方案应该适用于第一种情况,但不适用于第二种或第三种情况,而且您没有以同步方式修改队列。更重要的是,如果您遇到类似第 2 种情况,那么当没有数据要发送到输出,或者没有行要发送到要发送的队列时,您就是在浪费 cpu 个周期由 doStuff 处理,旋转于:
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
最后,无论您遇到哪种情况,我都建议您使用 Monitor 对象,这将允许您让特定线程等待,直到另一个进程通知它们某个条件为真并且它们可以再次被激活。通过使用 Monitor 对象,您不会浪费 cpu 个周期。
有关详细信息,请参阅:
https://docs.oracle.com/javase/7/docs/api/javax/management/monitor/Monitor.html
编辑:我删除了使用同步方法的建议,因为正如您所指出的,BlockingQueue 的方法是线程安全的(或几乎所有方法)并且可以防止竞争条件。
使用 ThreadPoolExecutor
绑定到固定大小的阻塞队列,您所有的复杂性都会在 JavaDoc 中消失。
只需一个线程读取文件并吞噬阻塞队列,所有处理都由执行器完成。
附录:
您可以在您的编写器上进行同步,或者简单地使用另一个队列,处理器填充它,并且您的单个写入线程使用该队列。
在编写器上同步很可能是最简单的方法。
我想逐行读取文件,对每一行做一些可以轻松并行完成的缓慢操作,然后将结果逐行写入文件。我不关心输出的顺序。输入和输出太大,内存放不下。我希望能够同时对线程数 运行 以及内存中的行数设置硬限制。
我用于文件 IO 的库 (Apache Commons CSV) 似乎不提供同步文件访问,因此我认为我无法同时从多个线程读取或写入同一文件。如果可能的话,我会创建一个 ThreadPoolExecutor 并为每一行提供一个任务,它会简单地读取该行,执行计算并写入结果。
相反,我认为我需要的是一个执行解析的单线程、一个用于解析输入行的有界队列、一个包含执行计算的作业的线程池、一个用于计算输出行的有界队列,以及一个写的线程。一个生产者,很多消费者-生产者和一个消费者,如果这是有道理的话。
我得到的是这样的:
BlockingQueue<CSVRecord> inputQueue = new ArrayBlockingQueue<CSVRecord>(INPUT_QUEUE_SIZE);
BlockingQueue<String[]> outputQueue = new ArrayBlockingQueue<String[]>(OUTPUT_QUEUE_SIZE);
Thread parserThread = new Thread(() -> {
while (inputFileIterator.hasNext()) {
CSVRecord record = inputFileIterator.next();
parsedQueue.put(record); // blocks if queue is full
}
});
// the job queue of the thread pool has to be bounded too, otherwise all
// the objects in the input queue will be given to jobs immediately and
// I'll run out of heap space
// source:
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
Thread processingBossThread = new Thread(() -> {
while (!inputQueue.isEmpty() || parserThread.isAlive()) {
CSVRecord record = inputQueue.take(); // blocks if queue is empty
executorService.execute(() -> {
String[] array = this.doStuff(record);
outputQueue.put(array); // blocks if queue is full
});
}
// getting here that means that all CSV rows have been read and
// added to the processing queue
executorService.shutdown(); // do not accept any new tasks
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
// wait for existing tasks to finish
});
Thread writerThread = new Thread(() -> {
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
String[] outputRow = outputQueue.take(); // blocks if queue is empty
outputFileWriter.printRecord((Object[]) outputRow);
});
parserThread.start();
consumerBossThread.start();
writerThread.start();
// wait until writer thread has finished
writerThread.join();
我省略了日志记录和异常处理,因此看起来比实际要短得多。
此解决方案有效,但我对此并不满意。必须创建我自己的线程,检查它们的 isAlive(),在 Runnable 中创建一个 Runnable,当我真的只想等到所有 worker 都完成时被迫指定一个超时等似乎很老套。总而言之一个 100 多行的方法,如果我让 Runnables 成为它们自己的 类,甚至是几百行代码,这似乎是一个非常基本的模式。
有更好的解决办法吗?我想尽可能多地使用 Java 的库,以帮助保持我的代码可维护并符合最佳实践。我仍然想知道它在幕后做了什么,但我怀疑自己实现所有这些是最好的方法。
更新: 更好的解决方案,根据答案的建议:
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
while (it.hasNext()) {
CSVRecord record = it.next();
executorService.execute(() -> {
String[] array = this.doStuff(record);
synchronized (writer) {
writer.printRecord((Object[]) array);
}
});
}
我想先指出一点,我可以想到三种可能的情况:
1.-对于一个文件的所有行,使用doStuff方法处理一行所需的时间大于从磁盘读取同一行并解析它所花费的时间
2.- 对于文件的所有行,使用doStuff方法处理一行所需的时间低于或等于读取同一行并解析它所花费的时间.
3.- 同一个文件的第一个和第二个场景都不是。
您的解决方案应该适用于第一种情况,但不适用于第二种或第三种情况,而且您没有以同步方式修改队列。更重要的是,如果您遇到类似第 2 种情况,那么当没有数据要发送到输出,或者没有行要发送到要发送的队列时,您就是在浪费 cpu 个周期由 doStuff 处理,旋转于:
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
最后,无论您遇到哪种情况,我都建议您使用 Monitor 对象,这将允许您让特定线程等待,直到另一个进程通知它们某个条件为真并且它们可以再次被激活。通过使用 Monitor 对象,您不会浪费 cpu 个周期。
有关详细信息,请参阅: https://docs.oracle.com/javase/7/docs/api/javax/management/monitor/Monitor.html
编辑:我删除了使用同步方法的建议,因为正如您所指出的,BlockingQueue 的方法是线程安全的(或几乎所有方法)并且可以防止竞争条件。
使用 ThreadPoolExecutor
绑定到固定大小的阻塞队列,您所有的复杂性都会在 JavaDoc 中消失。
只需一个线程读取文件并吞噬阻塞队列,所有处理都由执行器完成。
附录:
您可以在您的编写器上进行同步,或者简单地使用另一个队列,处理器填充它,并且您的单个写入线程使用该队列。
在编写器上同步很可能是最简单的方法。