Java 个线程 - 等待所有子线程以继续
Java threads - waiting on all child threads in order to proceed
所以有点背景;
我正在从事一个项目,其中一个 servlet 将在文件系统中的大量文本文件上发布爬虫。我当时想着把负载分到多个线程下,例如:
一个爬虫进入一个目录,找到3个文件和6个目录。它将开始处理文件并为其他目录启动一个新的爬虫线程。因此,从我的创建者 class 那里,我将在基本目录上创建一个爬虫。爬虫将评估工作量,如果认为需要,它会在另一个线程下生成另一个爬虫。
我的爬虫 class 看起来像这样
package com.fujitsu.spider;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
public class DocumentSpider implements Runnable, Serializable {
private static final long serialVersionUID = 8401649393078703808L;
private Spidermode currentMode = null;
private String URL = null;
private String[] terms = null;
private float score = 0;
private ArrayList<SpiderDataPair> resultList = null;
public enum Spidermode {
FILE, DIRECTORY
}
public DocumentSpider(String resourceURL, Spidermode mode, ArrayList<SpiderDataPair> resultList) {
currentMode = mode;
setURL(resourceURL);
this.setResultList(resultList);
}
@Override
public void run() {
try {
if (currentMode == Spidermode.FILE) {
doCrawlFile();
} else {
doCrawlDirectory();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("SPIDER @ " + URL + " HAS FINISHED.");
}
public Spidermode getCurrentMode() {
return currentMode;
}
public void setCurrentMode(Spidermode currentMode) {
this.currentMode = currentMode;
}
public String getURL() {
return URL;
}
public void setURL(String uRL) {
URL = uRL;
}
public void doCrawlFile() throws Exception {
File target = new File(URL);
if (target.isDirectory()) {
throw new Exception(
"This URL points to a directory while the spider is in FILE mode. Please change this spider to FILE mode.");
}
procesFile(target);
}
public void doCrawlDirectory() throws Exception {
File baseDir = new File(URL);
if (!baseDir.isDirectory()) {
throw new Exception(
"This URL points to a FILE while the spider is in DIRECTORY mode. Please change this spider to DIRECTORY mode.");
}
File[] directoryContent = baseDir.listFiles();
for (File f : directoryContent) {
if (f.isDirectory()) {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.DIRECTORY, this.resultList);
spider.terms = this.terms;
(new Thread(spider)).start();
} else {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.FILE, this.resultList);
spider.terms = this.terms;
(new Thread(spider)).start();
}
}
}
public void procesDirectory(String target) throws IOException {
File base = new File(target);
File[] directoryContent = base.listFiles();
for (File f : directoryContent) {
if (f.isDirectory()) {
procesDirectory(f.getPath());
} else {
procesFile(f);
}
}
}
public void procesFile(File target) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(target));
String line;
while ((line = br.readLine()) != null) {
String[] words = line.split(" ");
for (String currentWord : words) {
for (String a : terms) {
if (a.toLowerCase().equalsIgnoreCase(currentWord)) {
score += 1f;
}
;
if (currentWord.toLowerCase().contains(a)) {
score += 1f;
}
;
}
}
}
br.close();
resultList.add(new SpiderDataPair(this, URL));
}
public String[] getTerms() {
return terms;
}
public void setTerms(String[] terms) {
this.terms = terms;
}
public float getScore() {
return score;
}
public void setScore(float score) {
this.score = score;
}
public ArrayList<SpiderDataPair> getResultList() {
return resultList;
}
public void setResultList(ArrayList<SpiderDataPair> resultList) {
this.resultList = resultList;
}
}
我面临的问题是,在我的根爬虫中,我有来自我想要进一步处理的每个爬虫的结果列表。处理来自该列表的数据的操作是从 servlet(或本例的 main 方法)中调用的。然而,这些操作总是在所有爬虫完成它们的处理之前被调用。从而过早启动操作来处理结果,从而导致数据不完整。
我尝试使用连接方法解决这个问题,但不幸的是我似乎无法解决这个问题。
package com.fujitsu.spider;
import java.util.ArrayList;
import com.fujitsu.spider.DocumentSpider.Spidermode;
public class Main {
public static void main(String[] args) throws InterruptedException {
ArrayList<SpiderDataPair> results = new ArrayList<SpiderDataPair>();
String [] terms = {"SERVER","CHANGE","MO"};
DocumentSpider spider1 = new DocumentSpider("C:\Users\Mark\workspace\Spider\Files", Spidermode.DIRECTORY, results);
spider1.setTerms(terms);
DocumentSpider spider2 = new DocumentSpider("C:\Users\Mark\workspace\Spider\File2", Spidermode.DIRECTORY, results);
spider2.setTerms(terms);
Thread t1 = new Thread(spider1);
Thread t2 = new Thread(spider2);
t1.start();
t1.join();
t2.start();
t2.join();
for(SpiderDataPair d : spider1.getResultList()){
System.out.println("PATH -> " + d.getFile() + " SCORE -> " + d.getSpider().getScore());
}
for(SpiderDataPair d : spider2.getResultList()){
System.out.println("PATH -> " + d.getFile() + " SCORE -> " + d.getSpider().getScore());
}
}
}
TL:DR
我真的很想了解这个主题,所以任何帮助将不胜感激!
您应该使用比裸 Thread
更高级别的库来完成此任务。我建议特别研究 ExecutorService
和所有 java.util.concurrent
。那里有抽象可以管理所有线程问题,同时为格式良好的任务提供适当保护的环境,运行。
对于您的具体问题,我会推荐某种阻塞任务队列和标准的生产者-消费者架构。每个任务都知道如何确定其路径是文件还是目录。如果是文件,则处理该文件;如果它是目录,则抓取目录的直接内容并为每个子路径排队新任务。您还可以使用一些正确同步的共享状态来限制处理的文件数量、深度等。此外,该服务提供等待其任务终止的能力,使 "join" 更简单。
使用此架构,您可以将线程和线程管理(由 ExecutorService
处理)的概念与您的任务业务逻辑(通常是 Runnable
或 Callable
)分离。服务本身能够调整实例化方式,例如固定的最大线程数或可扩展的线程数,具体取决于存在的并发任务数(请参阅 java.util.concurrent.Executors
上的工厂方法)。 Thread
s,比它们执行的 Runnable
s 更昂贵,被重新使用以节省资源。
如果您的 objective 主要是在生产质量方面发挥作用的功能,那么图书馆就是您的不二之选。但是,如果您的 objective 是想了解线程管理的底层细节,那么您可能想要研究锁存器的使用,或许还需要线程组来在底层管理它们,从而暴露实现的细节,因此你可以处理细节。
您需要对代码进行一些更改:
在蜘蛛中:
List<Thread> threads = new LinkedList<Thread>();
for (File f : directoryContent) {
if (f.isDirectory()) {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.DIRECTORY, this.resultList);
spider.terms = this.terms;
Thread thread = new Thread(spider);
threads.add(thread)
thread.start();
} else {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.FILE, this.resultList);
spider.terms = this.terms;
Thread thread = new Thread(spider);
threads.add(thread)
thread.start();
}
}
for (Thread thread: threads) thread.join()
想法是为每个蜘蛛创建一个新线程并启动它。一旦它们都是 运行,您就等到每个 on 都完成,然后 Spider 才完成。这样每个蜘蛛线程都保持 运行 直到它的所有工作完成(因此顶级线程运行直到所有 children 和它们的 children 完成)。
您还需要更改您的跑步者,使其并行运行两个蜘蛛,而不是像这样一个接一个地运行:
Thread t1 = new Thread(spider1);
Thread t2 = new Thread(spider2);
t1.start();
t2.start();
t1.join();
t2.join();
所以有点背景;
我正在从事一个项目,其中一个 servlet 将在文件系统中的大量文本文件上发布爬虫。我当时想着把负载分到多个线程下,例如:
一个爬虫进入一个目录,找到3个文件和6个目录。它将开始处理文件并为其他目录启动一个新的爬虫线程。因此,从我的创建者 class 那里,我将在基本目录上创建一个爬虫。爬虫将评估工作量,如果认为需要,它会在另一个线程下生成另一个爬虫。
我的爬虫 class 看起来像这样
package com.fujitsu.spider;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
public class DocumentSpider implements Runnable, Serializable {
private static final long serialVersionUID = 8401649393078703808L;
private Spidermode currentMode = null;
private String URL = null;
private String[] terms = null;
private float score = 0;
private ArrayList<SpiderDataPair> resultList = null;
public enum Spidermode {
FILE, DIRECTORY
}
public DocumentSpider(String resourceURL, Spidermode mode, ArrayList<SpiderDataPair> resultList) {
currentMode = mode;
setURL(resourceURL);
this.setResultList(resultList);
}
@Override
public void run() {
try {
if (currentMode == Spidermode.FILE) {
doCrawlFile();
} else {
doCrawlDirectory();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("SPIDER @ " + URL + " HAS FINISHED.");
}
public Spidermode getCurrentMode() {
return currentMode;
}
public void setCurrentMode(Spidermode currentMode) {
this.currentMode = currentMode;
}
public String getURL() {
return URL;
}
public void setURL(String uRL) {
URL = uRL;
}
public void doCrawlFile() throws Exception {
File target = new File(URL);
if (target.isDirectory()) {
throw new Exception(
"This URL points to a directory while the spider is in FILE mode. Please change this spider to FILE mode.");
}
procesFile(target);
}
public void doCrawlDirectory() throws Exception {
File baseDir = new File(URL);
if (!baseDir.isDirectory()) {
throw new Exception(
"This URL points to a FILE while the spider is in DIRECTORY mode. Please change this spider to DIRECTORY mode.");
}
File[] directoryContent = baseDir.listFiles();
for (File f : directoryContent) {
if (f.isDirectory()) {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.DIRECTORY, this.resultList);
spider.terms = this.terms;
(new Thread(spider)).start();
} else {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.FILE, this.resultList);
spider.terms = this.terms;
(new Thread(spider)).start();
}
}
}
public void procesDirectory(String target) throws IOException {
File base = new File(target);
File[] directoryContent = base.listFiles();
for (File f : directoryContent) {
if (f.isDirectory()) {
procesDirectory(f.getPath());
} else {
procesFile(f);
}
}
}
public void procesFile(File target) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(target));
String line;
while ((line = br.readLine()) != null) {
String[] words = line.split(" ");
for (String currentWord : words) {
for (String a : terms) {
if (a.toLowerCase().equalsIgnoreCase(currentWord)) {
score += 1f;
}
;
if (currentWord.toLowerCase().contains(a)) {
score += 1f;
}
;
}
}
}
br.close();
resultList.add(new SpiderDataPair(this, URL));
}
public String[] getTerms() {
return terms;
}
public void setTerms(String[] terms) {
this.terms = terms;
}
public float getScore() {
return score;
}
public void setScore(float score) {
this.score = score;
}
public ArrayList<SpiderDataPair> getResultList() {
return resultList;
}
public void setResultList(ArrayList<SpiderDataPair> resultList) {
this.resultList = resultList;
}
}
我面临的问题是,在我的根爬虫中,我有来自我想要进一步处理的每个爬虫的结果列表。处理来自该列表的数据的操作是从 servlet(或本例的 main 方法)中调用的。然而,这些操作总是在所有爬虫完成它们的处理之前被调用。从而过早启动操作来处理结果,从而导致数据不完整。
我尝试使用连接方法解决这个问题,但不幸的是我似乎无法解决这个问题。
package com.fujitsu.spider;
import java.util.ArrayList;
import com.fujitsu.spider.DocumentSpider.Spidermode;
public class Main {
public static void main(String[] args) throws InterruptedException {
ArrayList<SpiderDataPair> results = new ArrayList<SpiderDataPair>();
String [] terms = {"SERVER","CHANGE","MO"};
DocumentSpider spider1 = new DocumentSpider("C:\Users\Mark\workspace\Spider\Files", Spidermode.DIRECTORY, results);
spider1.setTerms(terms);
DocumentSpider spider2 = new DocumentSpider("C:\Users\Mark\workspace\Spider\File2", Spidermode.DIRECTORY, results);
spider2.setTerms(terms);
Thread t1 = new Thread(spider1);
Thread t2 = new Thread(spider2);
t1.start();
t1.join();
t2.start();
t2.join();
for(SpiderDataPair d : spider1.getResultList()){
System.out.println("PATH -> " + d.getFile() + " SCORE -> " + d.getSpider().getScore());
}
for(SpiderDataPair d : spider2.getResultList()){
System.out.println("PATH -> " + d.getFile() + " SCORE -> " + d.getSpider().getScore());
}
}
}
TL:DR
我真的很想了解这个主题,所以任何帮助将不胜感激!
您应该使用比裸 Thread
更高级别的库来完成此任务。我建议特别研究 ExecutorService
和所有 java.util.concurrent
。那里有抽象可以管理所有线程问题,同时为格式良好的任务提供适当保护的环境,运行。
对于您的具体问题,我会推荐某种阻塞任务队列和标准的生产者-消费者架构。每个任务都知道如何确定其路径是文件还是目录。如果是文件,则处理该文件;如果它是目录,则抓取目录的直接内容并为每个子路径排队新任务。您还可以使用一些正确同步的共享状态来限制处理的文件数量、深度等。此外,该服务提供等待其任务终止的能力,使 "join" 更简单。
使用此架构,您可以将线程和线程管理(由 ExecutorService
处理)的概念与您的任务业务逻辑(通常是 Runnable
或 Callable
)分离。服务本身能够调整实例化方式,例如固定的最大线程数或可扩展的线程数,具体取决于存在的并发任务数(请参阅 java.util.concurrent.Executors
上的工厂方法)。 Thread
s,比它们执行的 Runnable
s 更昂贵,被重新使用以节省资源。
如果您的 objective 主要是在生产质量方面发挥作用的功能,那么图书馆就是您的不二之选。但是,如果您的 objective 是想了解线程管理的底层细节,那么您可能想要研究锁存器的使用,或许还需要线程组来在底层管理它们,从而暴露实现的细节,因此你可以处理细节。
您需要对代码进行一些更改:
在蜘蛛中:
List<Thread> threads = new LinkedList<Thread>();
for (File f : directoryContent) {
if (f.isDirectory()) {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.DIRECTORY, this.resultList);
spider.terms = this.terms;
Thread thread = new Thread(spider);
threads.add(thread)
thread.start();
} else {
DocumentSpider spider = new DocumentSpider(f.getPath(), Spidermode.FILE, this.resultList);
spider.terms = this.terms;
Thread thread = new Thread(spider);
threads.add(thread)
thread.start();
}
}
for (Thread thread: threads) thread.join()
想法是为每个蜘蛛创建一个新线程并启动它。一旦它们都是 运行,您就等到每个 on 都完成,然后 Spider 才完成。这样每个蜘蛛线程都保持 运行 直到它的所有工作完成(因此顶级线程运行直到所有 children 和它们的 children 完成)。
您还需要更改您的跑步者,使其并行运行两个蜘蛛,而不是像这样一个接一个地运行:
Thread t1 = new Thread(spider1);
Thread t2 = new Thread(spider2);
t1.start();
t2.start();
t1.join();
t2.join();