当另一个任务完成时,强制 BufferedWriter 从 BlockingQueue 写入
Force BufferedWriter to write from BlockingQueue when another tasks finished
我正在用 JSoup 编写简单的 html 解析器。我有大约 50,000 个链接要检查,所以我认为这是学习线程和并发性的好机会。我在 ExecutorService 中注册了 8 个任务:其中 6 个解析指向存储在 ArrayLists 中的某些数据的链接,然后将其添加到 BlockingQueues。其中两个任务是基于 BufferedWriter 的文件编写器。问题是当我的 6 个任务完成所有链接时,文件编写器停止从 BlockingQueue 写入数据,所以我丢失了部分数据。我是 java 的新手,所以如果你能帮帮我....代码:
主文件:
public static void main(String[] args) {
BlockingQueue<ArrayList<String>> units = new ArrayBlockingQueue<ArrayList<String>>(50, true);
BlockingQueue<ArrayList<String>> subjects = new ArrayBlockingQueue<ArrayList<String>>(50, true);
File subjectFile = new File("lekarze.csv");
File unitFile = new File("miejsca.csv");
ExecutorService executor = Executors.newFixedThreadPool(9);
executor.submit(new Thread(new FileSaver(subjects, subjectFile)));
executor.submit(new Thread(new FileSaver(units, unitFile)));
for(int i = 29323; i < 29400; i++){
executor.submit(new ParserDocsThread(i, subjects, units, errors));
}
executor.shutdown();
}
文件保护程序class:
package parser;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
public class FileSaver implements Runnable {
private BlockingQueue<ArrayList<String>> toWrite = null;
private File outputFile = null;
private BufferedWriter writer = null;
public FileSaver(BlockingQueue<ArrayList<String>> queue, File file){
toWrite = queue;
outputFile = file;
}
public void run() {
try {
writer = new BufferedWriter(new FileWriter(outputFile, true));
while(true){
try{
save(toWrite.take());
} catch(InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void save(ArrayList<String> data){
String temp ="";
int size = data.size();
for(int i = 0; i < size; i++){
temp += data.get(i);
if(i != size - 1) temp += '\t';
}
try {
writer.write(temp);
writer.newLine();
} catch (IOException e) {
e.printStackTrace();
}
}
}
在 ParserDocsThread 中,我只使用 put() 方法将元素添加到 BlockingQueue。
您的消费者线程没有干净地结束,因为 take()
调用正在等待一个新数组,并且没有关闭缓冲的写入器。 ServiceExecutor 放弃等待这些线程完成,并杀死它们。这导致写入器中的最后几行未写出到磁盘。
您应该使用 poll(10, TimeUnit.SECONDS)
(但要有适当的超时)。超时后,您的消费者将放弃生产者,您应该确保正确关闭缓冲写入器,以便正确打印缓冲区的最后一部分。
try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile, true)))
{
while(true){
List<String> data = toWrite.poll(10, TimeUnit.SECONDS);
if (data == null) {
break;
}
save(data, writer);
}
} catch (...) {
}
我已经把这里的缓冲写入器放入一个try-with-resources(所以这里的try会自动关闭写入器)并将它传递给你的保存方法,但你可以按照你的方式进行,然后手动关闭如果需要,可以在 finally 块中写入作者:
try {
...
} catch(...) {
} finally {
writer.close(); // Closes and flushes out the remaining lines
}
您可能还想在执行服务器上调用 awaitTermination
(例如:How to wait for all threads to finish, using ExecutorService?),等待时间大于您的轮询超时。
我正在用 JSoup 编写简单的 html 解析器。我有大约 50,000 个链接要检查,所以我认为这是学习线程和并发性的好机会。我在 ExecutorService 中注册了 8 个任务:其中 6 个解析指向存储在 ArrayLists 中的某些数据的链接,然后将其添加到 BlockingQueues。其中两个任务是基于 BufferedWriter 的文件编写器。问题是当我的 6 个任务完成所有链接时,文件编写器停止从 BlockingQueue 写入数据,所以我丢失了部分数据。我是 java 的新手,所以如果你能帮帮我....代码:
主文件:
public static void main(String[] args) {
BlockingQueue<ArrayList<String>> units = new ArrayBlockingQueue<ArrayList<String>>(50, true);
BlockingQueue<ArrayList<String>> subjects = new ArrayBlockingQueue<ArrayList<String>>(50, true);
File subjectFile = new File("lekarze.csv");
File unitFile = new File("miejsca.csv");
ExecutorService executor = Executors.newFixedThreadPool(9);
executor.submit(new Thread(new FileSaver(subjects, subjectFile)));
executor.submit(new Thread(new FileSaver(units, unitFile)));
for(int i = 29323; i < 29400; i++){
executor.submit(new ParserDocsThread(i, subjects, units, errors));
}
executor.shutdown();
}
文件保护程序class:
package parser;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
public class FileSaver implements Runnable {
private BlockingQueue<ArrayList<String>> toWrite = null;
private File outputFile = null;
private BufferedWriter writer = null;
public FileSaver(BlockingQueue<ArrayList<String>> queue, File file){
toWrite = queue;
outputFile = file;
}
public void run() {
try {
writer = new BufferedWriter(new FileWriter(outputFile, true));
while(true){
try{
save(toWrite.take());
} catch(InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void save(ArrayList<String> data){
String temp ="";
int size = data.size();
for(int i = 0; i < size; i++){
temp += data.get(i);
if(i != size - 1) temp += '\t';
}
try {
writer.write(temp);
writer.newLine();
} catch (IOException e) {
e.printStackTrace();
}
}
}
在 ParserDocsThread 中,我只使用 put() 方法将元素添加到 BlockingQueue。
您的消费者线程没有干净地结束,因为 take()
调用正在等待一个新数组,并且没有关闭缓冲的写入器。 ServiceExecutor 放弃等待这些线程完成,并杀死它们。这导致写入器中的最后几行未写出到磁盘。
您应该使用 poll(10, TimeUnit.SECONDS)
(但要有适当的超时)。超时后,您的消费者将放弃生产者,您应该确保正确关闭缓冲写入器,以便正确打印缓冲区的最后一部分。
try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile, true)))
{
while(true){
List<String> data = toWrite.poll(10, TimeUnit.SECONDS);
if (data == null) {
break;
}
save(data, writer);
}
} catch (...) {
}
我已经把这里的缓冲写入器放入一个try-with-resources(所以这里的try会自动关闭写入器)并将它传递给你的保存方法,但你可以按照你的方式进行,然后手动关闭如果需要,可以在 finally 块中写入作者:
try {
...
} catch(...) {
} finally {
writer.close(); // Closes and flushes out the remaining lines
}
您可能还想在执行服务器上调用 awaitTermination
(例如:How to wait for all threads to finish, using ExecutorService?),等待时间大于您的轮询超时。