Java - 带有 ExecutorService 的多线程爬虫
Java - Multi-threaded crawler with ExecutorService
我正致力于在 Java 中制作爬虫。我制作了一个单线程爬虫来访问单个页面并获取该页面上的所有 link。现在我想让它成为多线程但面临困难。一开始,我从页面的单个 link 开始,爬过其中的所有 link,现在我想 运行 一个 ExecutorService 其中线程开始从 unvisitedLinks 获取单个 url 并开始处理它,就像它对单线程爬虫所做的一样,对于更多线程做同样的事情同一件事情。这是我制作的爬虫 class,它实现了 Runnable 使其成为一个线程:
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
public class MyCrawler implements Runnable {
volatile static int counter =0;
String originaUrl, currentUrl;
List<String> unvisitedLinks = new ArrayList<>();
Set<String> visitedLinks = new HashSet<>();
URI uri;
ExecutorService executor = null;
int pagesVisited = 0;
public MyCrawler(String url) {
this.originaUrl = url;
unvisitedLinks.add(url);
this.uri = URI.create(url);
}
@Override
public void run() {
do{
try{
executor = Executors.newFixedThreadPool(10);
String url;
synchronized (this) {
url = unvisitedLinks.get(0);
while (unvisitedLinks.contains(url)) {
unvisitedLinks.remove(url);
}
}
//Visit this page and fetch all the links;
VisitPage(url);
visitedLinks.add(url);
for(int i = 0; i< 10; i++){
synchronized (this) {
url = unvisitedLinks.get(i);
while (unvisitedLinks.contains(url)) {
unvisitedLinks.remove(url);
}
}
Runnable worker = new MyCrawler(url);
executor.execute(worker);
}
executor.shutdown();
while(!executor.isTerminated()){ //WAIT FOR EXECUTOR TO FINISH
}
executor = null;
}catch(Exception e){
e.printStackTrace();
}
}while(unvisitedLinks.size() != 0);
System.out.println("total pages visited: " + counter);
System.out.println("TOTAL LINKS FOUND " + visitedLinks.size());
for(String s: visitedLinks){
System.out.println(s + "\n");
}
}
private void VisitPage(String url){
List<String> linksOnthisPage = new ArrayList<>();
if(!visitedLinks.contains(url)){
if(!url.contains("javascript") && !url.contains("#")){
try{
Document doc = Jsoup.connect(url).timeout(0).get();
Elements linkTags = doc.select("a[href]");
for(Element e : linkTags){
String link = e.attr("href");
if(!visitedLinks.contains(link) && !link.contains("#") && !link.contains("javascript") && !link.equals(url)){
if(link.startsWith("http") || link.startsWith("www")){
if(link.contains(uri.getHost())){
linksOnthisPage.add(link);
}else{
System.out.println("SOME OTHER WEBSITE -- " + link);
}
}else if(link.startsWith("/")){
link = url + link.substring(1, link.length());
linksOnthisPage.add(link);
}else{
System.out.println("LINK IGNORED DUE TO -- " + url);
}
}else{
System.out.println("LINK IGNORED -- " + url);
}
}
System.out.println("\n\nLinks found in \"" + url+ "\" : " + linksOnthisPage.size());
unvisitedLinks.addAll(linksOnthisPage);
System.out.println("UNVISITED LINKS NOW: " + unvisitedLinks.size());
}catch(Exception e){
System.out.println("EXCEPTION -- " + url);
return;
}
}else{
System.out.println("UNWANTED URL -- " + url);
}
}else{
System.out.println("LINK VISITED -- " + url);
}
}
}
这里是我开始提交 link 的主要方法。
public class MainClass {
public static void main(String[] args) {
try{
Thread t = new Thread(new MyCrawler("http://www.example.com/"));
t.start();
t.join();
System.out.println("\nFinished all threads\n---------------------------------");
}catch(Exception e){
e.printStackTrace();
}
System.out.println("DONE!");
}
}
P.S 您可能会在这段代码中遇到很多错误。请大家多多指正。
我认为您需要做的是在 Runnable 中仅处理 url 访问部分,这意味着 Runnable class 将是这样的:
public class MyCrawler implements Runnable {
URI uri;
public MyCrawler(String url) {
this.uri = URI.create(url);
}
@Override
public void run() {
try{
VisitPage(url);
}catch(Exception e){
e.printStackTrace();
}
}
private void VisitPage(String url){
List<String> linksOnthisPage = new ArrayList<>();
if(!url.contains("javascript") && !url.contains("#")){
try{
Document doc = Jsoup.connect(url).timeout(0).get();
Elements linkTags = doc.select("a[href]");
for(Element e : linkTags){
String link = e.attr("href");
if(!link.contains("#") && !link.contains("javascript") && !link.equals(url)){
if(link.startsWith("http") || link.startsWith("www")){
if(link.contains(uri.getHost())){
linksOnthisPage.add(link);
}else{
System.out.println("SOME OTHER WEBSITE -- " + link);
}
}else if(link.startsWith("/")){
link = url + link.substring(1, link.length());
linksOnthisPage.add(link);
}else{
System.out.println("LINK IGNORED DUE TO -- " + url);
}
}else{
System.out.println("LINK IGNORED -- " + url);
}
}
System.out.println("\n\nLinks found in \"" + url+ "\" : " + linksOnthisPage.size());
}catch(Exception e){
System.out.println("EXCEPTION -- " + url);
return;
}
}else{
System.out.println("UNWANTED URL -- " + url);
}
}
}
接下来遍历链接并为每个 url 的执行程序添加一个作业,(您可以在主方法中执行此操作或在新的 class 中执行此操作),代码片段看起来像这样:
for(String url : unvisitedLinks ){
{
Runnable worker = new MyCrawler(url);
executor.execute(worker);
}
我们在开始之前需要考虑几件事:
- 如何避免再次访问已经看过的页面?
- 何时终止线程池?
- 如何通知主线程所有任务已经完成,以便主线程输出站点地图?
要解决第一个问题,我们需要记住 link 是否被地图访问过。
为了解决第二个问题,我们需要一个计数器,每提交一个新任务就加1,每完成一个任务就减1。当计数器归零时,表示我们已完成所有任务。
要解决第三个问题,我们需要在主线程和线程池之间有某种同步机制。
现在我们有了这样的解决方案
public class Worker {
public static final Logger logger = LoggerFactory.getLogger(Worker.class);
private ConcurrentHashMap<String, Boolean> visited = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Set<String>> graph = new ConcurrentHashMap<>();
private final String domain;
private final ExecutorService executorService = Executors.newFixedThreadPool(8);
public final AtomicInteger counter = new AtomicInteger(0);
private final CountDownLatch done;
public Worker(String domain, CountDownLatch done) {
this.domain = domain;
this.done = done;
}
public void start() {
executorService.submit(new CrawlTask(domain));
}
public ConcurrentHashMap<String, Set<String>> getGraph() {
return graph;
}
public class CrawlTask implements Runnable {
public final String url;
public CrawlTask(String url) {
this.url = url;
}
@Override
public void run() {
logger.info("remaining tasks: {}, visiting {}", counter.get(), url);
Document doc;
try {
doc = Jsoup.connect(url).timeout(5000).get();
} catch (IOException e) {
logger.warn("URL: {}, {}", url, e.toString());
return;
}
Set<String> links = doc.select("a").stream().map(e -> e.attr("abs:href"))
.filter(l -> Utils.isInDomain(l, domain))
.map(Utils::trimURL)
.collect(Collectors.toSet());
graph.put(url, links);
for (String link : links) {
if (!visited.getOrDefault(link, false)) {
visited.put(link, true);
counter.getAndIncrement();
executorService.submit(new CrawlTask(link));
}
}
int n = counter.getAndDecrement();
if (n == 0) {
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
done.countDown();
}
}
}
}}
和主要功能
public class CounterApp {
public static void main(String[] args) throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(1);
String domain = "https://example.com";
Worker worker = new Worker(domain, doneSignal);
worker.start();
doneSignal.await();
Map<String, Set<String>> graph = worker.getGraph();
graph.forEach((k, v) -> {
System.out.println(k + ": ");
v.forEach(l -> System.out.println(" " + l));
});
}}
源代码here
我正致力于在 Java 中制作爬虫。我制作了一个单线程爬虫来访问单个页面并获取该页面上的所有 link。现在我想让它成为多线程但面临困难。一开始,我从页面的单个 link 开始,爬过其中的所有 link,现在我想 运行 一个 ExecutorService 其中线程开始从 unvisitedLinks 获取单个 url 并开始处理它,就像它对单线程爬虫所做的一样,对于更多线程做同样的事情同一件事情。这是我制作的爬虫 class,它实现了 Runnable 使其成为一个线程:
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
public class MyCrawler implements Runnable {
volatile static int counter =0;
String originaUrl, currentUrl;
List<String> unvisitedLinks = new ArrayList<>();
Set<String> visitedLinks = new HashSet<>();
URI uri;
ExecutorService executor = null;
int pagesVisited = 0;
public MyCrawler(String url) {
this.originaUrl = url;
unvisitedLinks.add(url);
this.uri = URI.create(url);
}
@Override
public void run() {
do{
try{
executor = Executors.newFixedThreadPool(10);
String url;
synchronized (this) {
url = unvisitedLinks.get(0);
while (unvisitedLinks.contains(url)) {
unvisitedLinks.remove(url);
}
}
//Visit this page and fetch all the links;
VisitPage(url);
visitedLinks.add(url);
for(int i = 0; i< 10; i++){
synchronized (this) {
url = unvisitedLinks.get(i);
while (unvisitedLinks.contains(url)) {
unvisitedLinks.remove(url);
}
}
Runnable worker = new MyCrawler(url);
executor.execute(worker);
}
executor.shutdown();
while(!executor.isTerminated()){ //WAIT FOR EXECUTOR TO FINISH
}
executor = null;
}catch(Exception e){
e.printStackTrace();
}
}while(unvisitedLinks.size() != 0);
System.out.println("total pages visited: " + counter);
System.out.println("TOTAL LINKS FOUND " + visitedLinks.size());
for(String s: visitedLinks){
System.out.println(s + "\n");
}
}
private void VisitPage(String url){
List<String> linksOnthisPage = new ArrayList<>();
if(!visitedLinks.contains(url)){
if(!url.contains("javascript") && !url.contains("#")){
try{
Document doc = Jsoup.connect(url).timeout(0).get();
Elements linkTags = doc.select("a[href]");
for(Element e : linkTags){
String link = e.attr("href");
if(!visitedLinks.contains(link) && !link.contains("#") && !link.contains("javascript") && !link.equals(url)){
if(link.startsWith("http") || link.startsWith("www")){
if(link.contains(uri.getHost())){
linksOnthisPage.add(link);
}else{
System.out.println("SOME OTHER WEBSITE -- " + link);
}
}else if(link.startsWith("/")){
link = url + link.substring(1, link.length());
linksOnthisPage.add(link);
}else{
System.out.println("LINK IGNORED DUE TO -- " + url);
}
}else{
System.out.println("LINK IGNORED -- " + url);
}
}
System.out.println("\n\nLinks found in \"" + url+ "\" : " + linksOnthisPage.size());
unvisitedLinks.addAll(linksOnthisPage);
System.out.println("UNVISITED LINKS NOW: " + unvisitedLinks.size());
}catch(Exception e){
System.out.println("EXCEPTION -- " + url);
return;
}
}else{
System.out.println("UNWANTED URL -- " + url);
}
}else{
System.out.println("LINK VISITED -- " + url);
}
}
}
这里是我开始提交 link 的主要方法。
public class MainClass {
public static void main(String[] args) {
try{
Thread t = new Thread(new MyCrawler("http://www.example.com/"));
t.start();
t.join();
System.out.println("\nFinished all threads\n---------------------------------");
}catch(Exception e){
e.printStackTrace();
}
System.out.println("DONE!");
}
}
P.S 您可能会在这段代码中遇到很多错误。请大家多多指正。
我认为您需要做的是在 Runnable 中仅处理 url 访问部分,这意味着 Runnable class 将是这样的:
public class MyCrawler implements Runnable {
URI uri;
public MyCrawler(String url) {
this.uri = URI.create(url);
}
@Override
public void run() {
try{
VisitPage(url);
}catch(Exception e){
e.printStackTrace();
}
}
private void VisitPage(String url){
List<String> linksOnthisPage = new ArrayList<>();
if(!url.contains("javascript") && !url.contains("#")){
try{
Document doc = Jsoup.connect(url).timeout(0).get();
Elements linkTags = doc.select("a[href]");
for(Element e : linkTags){
String link = e.attr("href");
if(!link.contains("#") && !link.contains("javascript") && !link.equals(url)){
if(link.startsWith("http") || link.startsWith("www")){
if(link.contains(uri.getHost())){
linksOnthisPage.add(link);
}else{
System.out.println("SOME OTHER WEBSITE -- " + link);
}
}else if(link.startsWith("/")){
link = url + link.substring(1, link.length());
linksOnthisPage.add(link);
}else{
System.out.println("LINK IGNORED DUE TO -- " + url);
}
}else{
System.out.println("LINK IGNORED -- " + url);
}
}
System.out.println("\n\nLinks found in \"" + url+ "\" : " + linksOnthisPage.size());
}catch(Exception e){
System.out.println("EXCEPTION -- " + url);
return;
}
}else{
System.out.println("UNWANTED URL -- " + url);
}
}
}
接下来遍历链接并为每个 url 的执行程序添加一个作业,(您可以在主方法中执行此操作或在新的 class 中执行此操作),代码片段看起来像这样:
for(String url : unvisitedLinks ){
{
Runnable worker = new MyCrawler(url);
executor.execute(worker);
}
我们在开始之前需要考虑几件事:
- 如何避免再次访问已经看过的页面?
- 何时终止线程池?
- 如何通知主线程所有任务已经完成,以便主线程输出站点地图?
要解决第一个问题,我们需要记住 link 是否被地图访问过。
为了解决第二个问题,我们需要一个计数器,每提交一个新任务就加1,每完成一个任务就减1。当计数器归零时,表示我们已完成所有任务。
要解决第三个问题,我们需要在主线程和线程池之间有某种同步机制。
现在我们有了这样的解决方案
public class Worker {
public static final Logger logger = LoggerFactory.getLogger(Worker.class);
private ConcurrentHashMap<String, Boolean> visited = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Set<String>> graph = new ConcurrentHashMap<>();
private final String domain;
private final ExecutorService executorService = Executors.newFixedThreadPool(8);
public final AtomicInteger counter = new AtomicInteger(0);
private final CountDownLatch done;
public Worker(String domain, CountDownLatch done) {
this.domain = domain;
this.done = done;
}
public void start() {
executorService.submit(new CrawlTask(domain));
}
public ConcurrentHashMap<String, Set<String>> getGraph() {
return graph;
}
public class CrawlTask implements Runnable {
public final String url;
public CrawlTask(String url) {
this.url = url;
}
@Override
public void run() {
logger.info("remaining tasks: {}, visiting {}", counter.get(), url);
Document doc;
try {
doc = Jsoup.connect(url).timeout(5000).get();
} catch (IOException e) {
logger.warn("URL: {}, {}", url, e.toString());
return;
}
Set<String> links = doc.select("a").stream().map(e -> e.attr("abs:href"))
.filter(l -> Utils.isInDomain(l, domain))
.map(Utils::trimURL)
.collect(Collectors.toSet());
graph.put(url, links);
for (String link : links) {
if (!visited.getOrDefault(link, false)) {
visited.put(link, true);
counter.getAndIncrement();
executorService.submit(new CrawlTask(link));
}
}
int n = counter.getAndDecrement();
if (n == 0) {
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
done.countDown();
}
}
}
}}
和主要功能
public class CounterApp {
public static void main(String[] args) throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(1);
String domain = "https://example.com";
Worker worker = new Worker(domain, doneSignal);
worker.start();
doneSignal.await();
Map<String, Set<String>> graph = worker.getGraph();
graph.forEach((k, v) -> {
System.out.println(k + ": ");
v.forEach(l -> System.out.println(" " + l));
});
}}
源代码here