LinkedBlockingQueue only return 多个线程之一
LinkedBlockingQueue only return one of mupltiple threads
我做了一个 class 来计算同一目录中给定文件中的单词数。由于文件很大,我决定使用多线程来实现多个文件的计数。
当 运行 如下指定的 DriverClass 时,它会卡在线程 1。
我究竟做错了什么?当我迭代 queue.take() 时,人们会期望解析器等待检索某些内容并继续前进。卡在线程 1 使我怀疑将 () 放入队列时出错。
提前致谢!
驱动类:
public class WordCountTest {
public static void main(String[] args){
if (args.length<1){
System.out.println("Please specify, atleast, one file");
}
BlockingQueue<Integer> threadQueue = new LinkedBlockingQueue<>();
Runnable r;
Thread t;
for (int i = 0; i<args.length; i++){
r = new WordCount(args[i], threadQueue);
t = new Thread(r);
t.start();
int total = 0;
for (int k = 0; k<args.length; k++){
try {
total += threadQueue.take();
} catch (InterruptedException e){
}
}
System.out.println("Total wordcount: " + total);
}
}
}
WordCountClass:
public class WordCount implements Runnable {
private int myId = 0;
private String _file;
private BlockingQueue<Integer> _queue;
private static int id = 0;
public WordCount(String file, BlockingQueue<Integer> queue){
_queue = queue;
_file = file;
myId = ++id;
}
@Override
public void run() {
System.out.println("Thread " + myId + " running");
try {
_queue.put(countWord(_file));
} catch (InterruptedException e){
}
}
public int countWord(String file){
int count = 0;
try {
Scanner in = new Scanner(new FileReader(file));
while (in.hasNext()){
count++;
in.next();
}
} catch (IOException e){
System.out.println("File," + file + ",not found");
}
return count;
}
}
您正在等待第一个线程启动后的所有结果。也许您打算在所有线程启动后等待结果。
注意:如果创建的线程多于 CPU 的数量,则速度可能会变慢。我建议改用固定线程池。
问题是您使用的是嵌套循环,而您应该使用两个单独的循环:一个用于启动 WordCounts
,另一个用于收集结果,例如
public class WordCountTest {
public static void main(String[] args){
Queue<Integer> threadQueue = new ConcurrentLinkedQueue<>();
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CountDownLatch latch = new CountDownLatch(args.length);
for (int i = 0; i<args.length; i++){
CompletableFuture.runAsync(new WordCount(args[i], threadQueue), executor)
.thenRunAsync(latch.countDown(), executor);
}
latch.await();
int sum = 0;
for(Integer i : threadQueue) {
sum += i;
}
}
}
或者无论您希望如何实施,重点是您不应该在所有 WordCounts
开始之前开始收集结果。
我做了一个 class 来计算同一目录中给定文件中的单词数。由于文件很大,我决定使用多线程来实现多个文件的计数。
当 运行 如下指定的 DriverClass 时,它会卡在线程 1。 我究竟做错了什么?当我迭代 queue.take() 时,人们会期望解析器等待检索某些内容并继续前进。卡在线程 1 使我怀疑将 () 放入队列时出错。
提前致谢!
驱动类:
public class WordCountTest {
public static void main(String[] args){
if (args.length<1){
System.out.println("Please specify, atleast, one file");
}
BlockingQueue<Integer> threadQueue = new LinkedBlockingQueue<>();
Runnable r;
Thread t;
for (int i = 0; i<args.length; i++){
r = new WordCount(args[i], threadQueue);
t = new Thread(r);
t.start();
int total = 0;
for (int k = 0; k<args.length; k++){
try {
total += threadQueue.take();
} catch (InterruptedException e){
}
}
System.out.println("Total wordcount: " + total);
}
}
}
WordCountClass:
public class WordCount implements Runnable {
private int myId = 0;
private String _file;
private BlockingQueue<Integer> _queue;
private static int id = 0;
public WordCount(String file, BlockingQueue<Integer> queue){
_queue = queue;
_file = file;
myId = ++id;
}
@Override
public void run() {
System.out.println("Thread " + myId + " running");
try {
_queue.put(countWord(_file));
} catch (InterruptedException e){
}
}
public int countWord(String file){
int count = 0;
try {
Scanner in = new Scanner(new FileReader(file));
while (in.hasNext()){
count++;
in.next();
}
} catch (IOException e){
System.out.println("File," + file + ",not found");
}
return count;
}
}
您正在等待第一个线程启动后的所有结果。也许您打算在所有线程启动后等待结果。
注意:如果创建的线程多于 CPU 的数量,则速度可能会变慢。我建议改用固定线程池。
问题是您使用的是嵌套循环,而您应该使用两个单独的循环:一个用于启动 WordCounts
,另一个用于收集结果,例如
public class WordCountTest {
public static void main(String[] args){
Queue<Integer> threadQueue = new ConcurrentLinkedQueue<>();
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CountDownLatch latch = new CountDownLatch(args.length);
for (int i = 0; i<args.length; i++){
CompletableFuture.runAsync(new WordCount(args[i], threadQueue), executor)
.thenRunAsync(latch.countDown(), executor);
}
latch.await();
int sum = 0;
for(Integer i : threadQueue) {
sum += i;
}
}
}
或者无论您希望如何实施,重点是您不应该在所有 WordCounts
开始之前开始收集结果。