ExecutorService 永远不会停止。在另一个正在执行的任务中执行新任务时

ExecutorService never stops. When execute new task inside another executing task

美好的一天。

我的网络爬虫项目有阻塞问题。 逻辑很简单。首先创建一个 Runnable,它下载 html 文档,扫描所有 links,然后在所有资助的 links 上创建新的 Runnable 对象。每个新创建的 Runnable 依次为每个 link 创建新的 Runnable 对象并执行它们。

问题是 ExecutorService 永远不会停止。

CrawlerTest.java

public class CrawlerTest {

    public static void main(String[] args) throws InterruptedException {
        new CrawlerService().crawlInternetResource("https://jsoup.org/");
    }
}

CrawlerService.java

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

public class CrawlerService {

    private Set<String> uniqueUrls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>(10000));
    private ExecutorService executorService = Executors.newFixedThreadPool(8);
    private String baseDomainUrl;

    public void crawlInternetResource(String baseDomainUrl) throws InterruptedException {
        this.baseDomainUrl = baseDomainUrl;
        System.out.println("Start");
        executorService.execute(new Crawler(baseDomainUrl)); //Run first thread and scan main domain page. This thread produce new threads.
        executorService.awaitTermination(10, TimeUnit.MINUTES);
        System.out.println("End");
    }

    private class Crawler implements Runnable { // Inner class that encapsulates thread and scan for links

        private String urlToCrawl;

        public Crawler(String urlToCrawl) {
            this.urlToCrawl = urlToCrawl;
        }

        public void run() {
            try {
                findAllLinks();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void findAllLinks() throws InterruptedException {
            /*Try to add new url in collection, if url is unique adds it to collection, 
             * scan document and start new thread for finded links*/
            if (uniqueUrls.add(urlToCrawl)) { 
                System.out.println(urlToCrawl);

                Document htmlDocument = loadHtmlDocument(urlToCrawl);
                Elements findedLinks = htmlDocument.select("a[href]");

                for (Element link : findedLinks) {
                    String absLink = link.attr("abs:href");
                    if (absLink.contains(baseDomainUrl) && !absLink.contains("#")) { //Check that we are don't go out of domain
                        executorService.execute(new Crawler(absLink)); //Start new thread for each funded link
                    }
                }
            }
        }

        private Document loadHtmlDocument(String internetResourceUrl) {
            Document document = null;
            try {
                document = Jsoup.connect(internetResourceUrl).ignoreHttpErrors(true).ignoreContentType(true)
                        .userAgent("Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
                        .timeout(10000).get();
            } catch (IOException e) {
                System.out.println("Page load error");
                e.printStackTrace();
            }
            return document;
        }
    }
}

此应用大约需要 20 秒来扫描 jsoup.org 所有唯一的 link。但它只需等待 10 分钟 executorService.awaitTermination(10, TimeUnit.MINUTES); 然后我看到死主线程和仍在工作的执行者。

Threads

如何强制ExecutorService正常工作?

我认为问题是它在另一个任务中而不是在主线程中调用 executorService.execute。

您在滥用 awaitTermination。根据 javadoc,你应该先调用 shutdown

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

为了实现您的目标,我建议使用 CountDownLatch (or latch that support increments like this one) 来确定没有剩余任务的确切时刻,这样您就可以安全地执行 shutdown.

您没有调用关机。

这可能有效 - CrawlerService 中的 AtomicLong 变量。在每个新的子任务提交给执行程序服务之前递增。

修改您的 运行() 方法以减少此计数器,如果为 0,则关闭执行程序服务

public void run() {
    try {
        findAllLinks();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        //decrements counter
        //If 0, shutdown executor from here or just notify CrawlerService who would be doing wait().
    }
}

在"finally"中,减少计数器,当计数器为零时,关闭执行器或只通知CrawlerService。 0 表示,这是最后一个,没有其他 运行ning,none 等待在队列中。没有任务会提交任何新的子任务。

我看到你之前的评论:

I can't use CountDownLatch because I don't know beforehand how many unique links I will collect from resource.

首先,vsminkov 很准确地回答了 awaitTermniation 为什么要坐等 10 分钟。我将提供替代解决方案。

不要使用 CountDownLatch,而是使用 Phaser。对于每个新任务,您都可以注册并等待完成。

每次调用 execute.submitarrive 每次 Runnable 完成时创建一个移相器和 register

public void crawlInternetResource(String baseDomainUrl) {
    this.baseDomainUrl = baseDomainUrl;

    Phaser phaser = new Phaser();
    executorService.execute(new Crawler(phaser, baseDomainUrl)); 
    int phase = phaser.getPhase();
    phase.awaitAdvance(phase);
}

private class Crawler implements Runnable { 

    private final Phaser phaser;
    private String urlToCrawl;

    public Crawler(Phaser phaser, String urlToCrawl) {
        this.urlToCrawl = urlToCrawl;
        this.phaser = phaser;
        phaser.register(); // register new task
    }

    public void run(){
       ...
       phaser.arrive(); //may want to surround this in try/finally
    }

How to force ExecutorService work correctly?

I think problem is that it invoke executorService.execute inside another task instead in main thread.

没有。问题不在于 ExecutorService。您使用 API 的方式不正确,因此没有得到正确的结果。

您必须按特定顺序使用三个 API 才能获得正确的结果。

1. shutdown
2. awaitTermination
3. shutdownNow

来自 ExecutorService 的 oracle 文档页面的推荐方法:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

shutdown(): 启动有序关闭,执行之前提交的任务,但不会接受新任务。

shutdownNow():尝试停止所有正在执行的任务,停止等待任务的处理,并returns列出等待执行的任务。

awaitTermination():阻塞直到所有任务在关闭请求后执行完毕,或者发生超时,或者当前线程被中断,以先发生者为准。

另注:如果你想等待所有任务完成,请参考这个相关的 SE 问题:

wait until all threads finish their work in java

我更喜欢使用最适合您的用例的 invokeAll()ForkJoinPool()