Java - 带有 ExecutorService 的多线程爬虫

Java - Multi-threaded crawler with ExecutorService

我正致力于在 Java 中制作爬虫。我制作了一个单线程爬虫来访问单个页面并获取该页面上的所有 link。现在我想让它成为多线程但面临困难。一开始,我从页面的单个 link 开始,爬过其中的所有 link,现在我想 运行 一个 ExecutorService 其中线程开始从 unvisitedLinks 获取单个 url 并开始处理它,就像它对单线程爬虫所做的一样,对于更多线程做同样的事情同一件事情。这是我制作的爬虫 class,它实现了 Runnable 使其成为一个线程:

import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

public class MyCrawler implements Runnable {
    volatile static int counter =0;
    String originaUrl, currentUrl;
    List<String> unvisitedLinks = new ArrayList<>();
    Set<String> visitedLinks = new HashSet<>();
    URI uri;
    ExecutorService executor = null;
    int pagesVisited = 0;


    public MyCrawler(String url) {
        this.originaUrl = url;
        unvisitedLinks.add(url);
         this.uri = URI.create(url);
    }

    @Override
    public void run() {
        do{
            try{
                executor = Executors.newFixedThreadPool(10);
                String url; 
                synchronized (this) {
                    url = unvisitedLinks.get(0);
                    while (unvisitedLinks.contains(url)) {
                        unvisitedLinks.remove(url);
                    }
                }
                //Visit this page and fetch all the links;
                VisitPage(url);

                visitedLinks.add(url);

                for(int i = 0; i< 10; i++){
                    synchronized (this) {
                        url = unvisitedLinks.get(i);
                        while (unvisitedLinks.contains(url)) {
                            unvisitedLinks.remove(url);
                        }
                    }
                    Runnable worker = new MyCrawler(url);
                    executor.execute(worker);
                }

                executor.shutdown();
                while(!executor.isTerminated()){ //WAIT FOR EXECUTOR TO FINISH

                }
                executor = null;
            }catch(Exception e){
                e.printStackTrace();
            }

        }while(unvisitedLinks.size() != 0);
        System.out.println("total pages visited: " + counter);
        System.out.println("TOTAL LINKS FOUND " + visitedLinks.size());

        for(String s: visitedLinks){
            System.out.println(s + "\n");
        }
    }

    private void VisitPage(String url){

        List<String> linksOnthisPage = new ArrayList<>();

        if(!visitedLinks.contains(url)){
            if(!url.contains("javascript") && !url.contains("#")){

                try{
                    Document doc = Jsoup.connect(url).timeout(0).get();
                    Elements linkTags = doc.select("a[href]");

                    for(Element e : linkTags){
                        String link = e.attr("href");
                        if(!visitedLinks.contains(link) && !link.contains("#") && !link.contains("javascript") && !link.equals(url)){
                            if(link.startsWith("http") || link.startsWith("www")){
                                if(link.contains(uri.getHost())){
                                    linksOnthisPage.add(link);
                                }else{
                                    System.out.println("SOME OTHER WEBSITE -- " + link);
                                }

                            }else if(link.startsWith("/")){
                                link = url + link.substring(1, link.length());
                                linksOnthisPage.add(link);
                            }else{
                                System.out.println("LINK IGNORED DUE TO  -- " + url);
                            }
                        }else{
                            System.out.println("LINK IGNORED -- " + url);
                        }
                    }
                    System.out.println("\n\nLinks found in \"" + url+ "\" : " + linksOnthisPage.size());
                    unvisitedLinks.addAll(linksOnthisPage);
                    System.out.println("UNVISITED LINKS NOW: " + unvisitedLinks.size());
                }catch(Exception e){
                    System.out.println("EXCEPTION -- " + url);
                    return;
                }
            }else{
                System.out.println("UNWANTED URL -- " + url);
            }
        }else{
            System.out.println("LINK VISITED -- " + url);
        }
    }

}

这里是我开始提交 link 的主要方法。

public class MainClass {

    public static void main(String[] args) {

        try{
            Thread t = new Thread(new MyCrawler("http://www.example.com/"));

            t.start();
            t.join();
            System.out.println("\nFinished all threads\n---------------------------------");

        }catch(Exception e){
            e.printStackTrace();
        }

        System.out.println("DONE!");


    }

}

P.S 您可能会在这段代码中遇到很多错误。请大家多多指正。

我认为您需要做的是在 Runnable 中仅处理 url 访问部分,这意味着 Runnable class 将是这样的:

public class MyCrawler implements Runnable {

    URI uri;



    public MyCrawler(String url) {
         this.uri = URI.create(url);
    }

    @Override
    public void run() {

        try{
            VisitPage(url);

        }catch(Exception e){
            e.printStackTrace();
        }


    }

    private void VisitPage(String url){

        List<String> linksOnthisPage = new ArrayList<>();

        if(!url.contains("javascript") && !url.contains("#")){

            try{
                Document doc = Jsoup.connect(url).timeout(0).get();
                Elements linkTags = doc.select("a[href]");

                for(Element e : linkTags){
                    String link = e.attr("href");
                    if(!link.contains("#") && !link.contains("javascript") && !link.equals(url)){
                        if(link.startsWith("http") || link.startsWith("www")){
                            if(link.contains(uri.getHost())){
                                linksOnthisPage.add(link);
                            }else{
                                System.out.println("SOME OTHER WEBSITE -- " + link);
                            }

                        }else if(link.startsWith("/")){
                            link = url + link.substring(1, link.length());
                            linksOnthisPage.add(link);
                        }else{
                            System.out.println("LINK IGNORED DUE TO  -- " + url);
                        }
                    }else{
                        System.out.println("LINK IGNORED -- " + url);
                    }
                }
                System.out.println("\n\nLinks found in \"" + url+ "\" : " + linksOnthisPage.size());

            }catch(Exception e){
                System.out.println("EXCEPTION -- " + url);
                return;
            }
        }else{
            System.out.println("UNWANTED URL -- " + url);
        }
    }

}

接下来遍历链接并为每个 url 的执行程序添加一个作业,(您可以在主方法中执行此操作或在新的 class 中执行此操作),代码片段看起来像这样:

for(String url : unvisitedLinks ){
{
    Runnable worker = new MyCrawler(url);
    executor.execute(worker);
}

我们在开始之前需要考虑几件事:

  1. 如何避免再次访问已经看过的页面?
  2. 何时终止线程池?
  3. 如何通知主线程所有任务已经完成,以便主线程输出站点地图?

要解决第一个问题,我们需要记住 link 是否被地图访问过。

为了解决第二个问题,我们需要一个计数器,每提交一个新任务就加1,每完成一个任务就减1。当计数器归零时,表示我们已完成所有任务。

要解决第三个问题,我们需要在主线程和线程池之间有某种同步机制。

现在我们有了这样的解决方案

public class Worker {

public static final Logger logger = LoggerFactory.getLogger(Worker.class);

private ConcurrentHashMap<String, Boolean> visited = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Set<String>> graph = new ConcurrentHashMap<>();

private final String domain;
private final ExecutorService executorService = Executors.newFixedThreadPool(8);

public final AtomicInteger counter = new AtomicInteger(0);
private final CountDownLatch done;

public Worker(String domain, CountDownLatch done) {
    this.domain = domain;
    this.done = done;
}

public void start() {
    executorService.submit(new CrawlTask(domain));
}

public ConcurrentHashMap<String, Set<String>> getGraph() {
    return graph;
}

public class CrawlTask implements Runnable {
    public final String url;

    public CrawlTask(String url) {
        this.url = url;
    }

    @Override
    public void run() {
        logger.info("remaining tasks: {}, visiting {}", counter.get(), url);

        Document doc;

        try {
            doc = Jsoup.connect(url).timeout(5000).get();
        } catch (IOException e) {
            logger.warn("URL: {}, {}", url, e.toString());
            return;
        }

        Set<String> links = doc.select("a").stream().map(e -> e.attr("abs:href"))
                .filter(l -> Utils.isInDomain(l, domain))
                .map(Utils::trimURL)
                .collect(Collectors.toSet());

        graph.put(url, links);

        for (String link : links) {
            if (!visited.getOrDefault(link, false)) {
                visited.put(link, true);
                counter.getAndIncrement();
                executorService.submit(new CrawlTask(link));
            }
        }

        int n = counter.getAndDecrement();
        if (n == 0) {
            executorService.shutdown();
            try {
                executorService.awaitTermination(1, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                done.countDown();
            }
        }
    }
}}

和主要功能

public class CounterApp {
public static void main(String[] args) throws InterruptedException {
    CountDownLatch doneSignal = new CountDownLatch(1);
    String domain = "https://example.com";
    Worker worker = new Worker(domain, doneSignal);
    worker.start();
    doneSignal.await();

    Map<String, Set<String>> graph = worker.getGraph();
    graph.forEach((k, v) -> {
        System.out.println(k + ": ");
        v.forEach(l -> System.out.println("   " + l));
    });
}}

源代码here