Java 中的多线程网络爬虫
Multithreaded Web Crawler in Java
我正在尝试使用 Jsoup.I 在 Java 中编写一个多线程网络爬虫有一个 Java Class "Master" 创建 6 个线程(5 个用于爬行和 1 个用于维护队列),以及 3 个队列,即 "to_do"、"to_do_next"(将在下一次迭代中完成)和 "done"(最终链接)。
我在共享 queues.The 上使用同步锁的想法是,一旦所有 5 个线程发现 "to_do" 队列为空,它们就会通知一个维护线程,该线程会做一些工作并通知这些线程 back.But 问题程序是否有时会被阻塞(所以我假设存在一些我无法处理的竞争条件)......而且在检查时我发现并非所有线程都收到维护 thread.so 的通知是吗可能会丢失一些通知信号??
硕士代码class
private Queue<String> to_do = new LinkedList<String>();
private Queue<String> done= new LinkedList<String>();
private Queue<String> to_do_next = new LinkedList<String>();
private int level = 1;
private Object lock1 = new Object();
private Object lock2 = new Object();
private Object lock3 = new Object();
private static Thread maintenance;
public static Master mref;
public static Object wait1 = new Object();
public static Object wait2 = new Object();
public static Object wait3 = new Object();
public static int flag = 5;
public static int missedSignals = -1;
public boolean checkToDoEmpty(){
return to_do.isEmpty();
}
public int getLevel() {
return level;
}
public void incLevel() {
this.level++;
}
public static void interrupt() {
maintenance.interrupt();
}
public void transfer() {
to_do = to_do_next;
}
public String accessToDo() {
synchronized(lock1){
String tmp = to_do.peek();
if(tmp != null)
tmp = to_do.remove();
return tmp;
}
}
public void addToDoNext(String url){
synchronized(lock2){
to_do_next.add(url);
}
}
public void addDone(String string) {
synchronized(lock3){
done.add(string);
}
}
public static void main(String[] args){
Master m = new Master();
mref = m;
URL startUrl = null;
try {
startUrl = new URL("http://cse.iitkgp.ac.in");
}catch (MalformedURLException e1) {
e1.printStackTrace();
}
Thread t1 = new Thread(new Worker(1));
Thread t2 = new Thread(new Worker(2));
Thread t3 = new Thread(new Worker(3));
Thread t4 = new Thread(new Worker(4));
Thread t5 = new Thread(new Worker(5));
maintenance = new Thread(new MaintenanceThread());
m.to_do.add(startUrl.toString());
maintenance.start();
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
try {
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
/*for(String s:m.done)
System.out.println(s);
for(String s:m.to_do)
System.out.println(s);*/
}
工作线程代码
public void run() {
while(Master.mref.getLevel() != 3){
if(!Master.mref.checkToDoEmpty()){
String url = Master.mref.accessToDo();
if(url != null && url.contains("iitkgp") && url.contains("http://")){
try {
Document doc = Jsoup.connect(url).get();
org.jsoup.select.Elements links = doc.select("a[href]");
for(org.jsoup.nodes.Element l: links){
Master.mref.addToDoNext(l.attr("abs:href").toString());
}
Master.mref.addDone(url);
} catch (IOException e) {
System.out.println(url);
e.printStackTrace();
}
continue;
}
}
//System.out.println("thread " + id + " about to notify on wait1");
synchronized(Master.wait1){
Master.wait1.notify();
Master.missedSignals++;
}
synchronized(Master.wait2){
try {
Master.wait2.wait();
System.out.println("thread " + id + " coming out of wait2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("Terminating " + id + " thread");
Master.flag--;
if(Master.flag == 0)
Master.interrupt();
}
维护线程代码
while(Master.flag != 0){
try {
synchronized(Master.wait1){
if(Master.missedSignals != -1){
count += Master.missedSignals;
Master.missedSignals = -1;
}
while(count != 5){
Master.wait1.wait();
if(Master.missedSignals != -1)
count += Master.missedSignals;
Master.missedSignals = -1;
count++;
}
count = 0;
}
//System.out.println("in between");
Master.mref.incLevel();
Master.mref.transfer();
synchronized(Master.wait2){
Master.wait2.notifyAll();
}
} catch (InterruptedException e) {
break;
}
}
System.out.println("Mainta thread gone");
你的设计太复杂了
我建议为您的 to_do 队列使用以下内容:LinkedBlockingQueue
这是一个阻塞队列,这意味着您的线程会从队列中请求一个对象,只有当一个出现时它们才会得到该对象,直到那时它们才会保持阻塞状态。
只需使用以下方法在队列中放入和取出对象:put() & take()
关于这个特殊队列的更多解释,请查看以下两个链接:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html
http://tutorials.jenkov.com/java-util-concurrent/linkedblockingqueue.html
现在,您唯一关心的是在线程完成工作后终止线程,为此我建议如下:
boolean someThreadStillAlive = true;
while (someThreadStillAlive) {
someThreadStillAlive = false;
Thread.sleep(200);
for (Thread t : fetchAndParseThreads) {
someThreadStillAlive = someThreadStillAlive || t.isAlive();
}
}
这将发生在您的主代码块中,它将循环并休眠直到所有线程完成。
哦,你可以使用 poll(int timeout...) 而不是 take(),它将等待超时完成,如果没有新对象插入队列,它将终止线程。
以上所有,都在我自己的爬虫中成功使用。
我正在尝试使用 Jsoup.I 在 Java 中编写一个多线程网络爬虫有一个 Java Class "Master" 创建 6 个线程(5 个用于爬行和 1 个用于维护队列),以及 3 个队列,即 "to_do"、"to_do_next"(将在下一次迭代中完成)和 "done"(最终链接)。 我在共享 queues.The 上使用同步锁的想法是,一旦所有 5 个线程发现 "to_do" 队列为空,它们就会通知一个维护线程,该线程会做一些工作并通知这些线程 back.But 问题程序是否有时会被阻塞(所以我假设存在一些我无法处理的竞争条件)......而且在检查时我发现并非所有线程都收到维护 thread.so 的通知是吗可能会丢失一些通知信号??
硕士代码class
private Queue<String> to_do = new LinkedList<String>();
private Queue<String> done= new LinkedList<String>();
private Queue<String> to_do_next = new LinkedList<String>();
private int level = 1;
private Object lock1 = new Object();
private Object lock2 = new Object();
private Object lock3 = new Object();
private static Thread maintenance;
public static Master mref;
public static Object wait1 = new Object();
public static Object wait2 = new Object();
public static Object wait3 = new Object();
public static int flag = 5;
public static int missedSignals = -1;
public boolean checkToDoEmpty(){
return to_do.isEmpty();
}
public int getLevel() {
return level;
}
public void incLevel() {
this.level++;
}
public static void interrupt() {
maintenance.interrupt();
}
public void transfer() {
to_do = to_do_next;
}
public String accessToDo() {
synchronized(lock1){
String tmp = to_do.peek();
if(tmp != null)
tmp = to_do.remove();
return tmp;
}
}
public void addToDoNext(String url){
synchronized(lock2){
to_do_next.add(url);
}
}
public void addDone(String string) {
synchronized(lock3){
done.add(string);
}
}
public static void main(String[] args){
Master m = new Master();
mref = m;
URL startUrl = null;
try {
startUrl = new URL("http://cse.iitkgp.ac.in");
}catch (MalformedURLException e1) {
e1.printStackTrace();
}
Thread t1 = new Thread(new Worker(1));
Thread t2 = new Thread(new Worker(2));
Thread t3 = new Thread(new Worker(3));
Thread t4 = new Thread(new Worker(4));
Thread t5 = new Thread(new Worker(5));
maintenance = new Thread(new MaintenanceThread());
m.to_do.add(startUrl.toString());
maintenance.start();
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
try {
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
/*for(String s:m.done)
System.out.println(s);
for(String s:m.to_do)
System.out.println(s);*/
}
工作线程代码
public void run() {
while(Master.mref.getLevel() != 3){
if(!Master.mref.checkToDoEmpty()){
String url = Master.mref.accessToDo();
if(url != null && url.contains("iitkgp") && url.contains("http://")){
try {
Document doc = Jsoup.connect(url).get();
org.jsoup.select.Elements links = doc.select("a[href]");
for(org.jsoup.nodes.Element l: links){
Master.mref.addToDoNext(l.attr("abs:href").toString());
}
Master.mref.addDone(url);
} catch (IOException e) {
System.out.println(url);
e.printStackTrace();
}
continue;
}
}
//System.out.println("thread " + id + " about to notify on wait1");
synchronized(Master.wait1){
Master.wait1.notify();
Master.missedSignals++;
}
synchronized(Master.wait2){
try {
Master.wait2.wait();
System.out.println("thread " + id + " coming out of wait2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("Terminating " + id + " thread");
Master.flag--;
if(Master.flag == 0)
Master.interrupt();
}
维护线程代码
while(Master.flag != 0){
try {
synchronized(Master.wait1){
if(Master.missedSignals != -1){
count += Master.missedSignals;
Master.missedSignals = -1;
}
while(count != 5){
Master.wait1.wait();
if(Master.missedSignals != -1)
count += Master.missedSignals;
Master.missedSignals = -1;
count++;
}
count = 0;
}
//System.out.println("in between");
Master.mref.incLevel();
Master.mref.transfer();
synchronized(Master.wait2){
Master.wait2.notifyAll();
}
} catch (InterruptedException e) {
break;
}
}
System.out.println("Mainta thread gone");
你的设计太复杂了
我建议为您的 to_do 队列使用以下内容:LinkedBlockingQueue
这是一个阻塞队列,这意味着您的线程会从队列中请求一个对象,只有当一个出现时它们才会得到该对象,直到那时它们才会保持阻塞状态。
只需使用以下方法在队列中放入和取出对象:put() & take()
关于这个特殊队列的更多解释,请查看以下两个链接: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html
http://tutorials.jenkov.com/java-util-concurrent/linkedblockingqueue.html
现在,您唯一关心的是在线程完成工作后终止线程,为此我建议如下:
boolean someThreadStillAlive = true;
while (someThreadStillAlive) {
someThreadStillAlive = false;
Thread.sleep(200);
for (Thread t : fetchAndParseThreads) {
someThreadStillAlive = someThreadStillAlive || t.isAlive();
}
}
这将发生在您的主代码块中,它将循环并休眠直到所有线程完成。
哦,你可以使用 poll(int timeout...) 而不是 take(),它将等待超时完成,如果没有新对象插入队列,它将终止线程。
以上所有,都在我自己的爬虫中成功使用。