如何检查 ConcurrentLinkedQueue 的 size() 或 isEmpty()

How to check the size() or isEmpty() for ConcurrentLinkedQueue

我正在尝试为 Java 中的 Web 爬虫制作一个简单的结构原型。到目前为止,原型只是试图执行以下操作:

对于开始 URLs 的队列,我使用 ConcurrentLinkedQueue 进行同步。 为了生成新线程,我正在使用 ExecutorService.

但是在创建新线程时,应用程序需要检查 ConcurrentLinkedQueue 是否为空。我尝试使用:

但两者似乎都没有返回 ConcurrentLinkedQueue 的真实状态。

问题出在下面的块中:

while (!crawler.getUrl_horizon().isEmpty()) {
                workers.submitNewWorkerThread(crawler);
            }

因此,ExecutorService 在其限制内创建所有线程,即使输入只有 2 URLs。

是不是这里实现多线程的方式有问题?如果不是,检查 ConcurrentLinkedQueue 状态的更好方法是什么?

正在为应用程序启动 class:

public class CrawlerApp {

    private static Crawler crawler;

    public static void main(String[] args) {
        crawler = = new Crawler();
        initializeApp();
        startCrawling();

    }

    private static void startCrawling() {
        crawler.setUrl_visited(new HashSet<URL>());
        WorkerManager workers = WorkerManager.getInstance();
        while (!crawler.getUrl_horizon().isEmpty()) {
            workers.submitNewWorkerThread(crawler);
        }
        try {
            workers.getExecutor().shutdown();
            workers.getExecutor().awaitTermination(10, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void initializeApp() {

        Properties config = new Properties();
        try {
            config.load(CrawlerApp.class.getClassLoader().getResourceAsStream("url-horizon.properties"));
            String[] horizon = config.getProperty("urls").split(",");
            ConcurrentLinkedQueue<URL> url_horizon = new ConcurrentLinkedQueue<>();
            for (String link : horizon) {
                URL url = new URL();
                url.setURL(link);
                url_horizon.add(url);
            }
            crawler.setUrl_horizon(url_horizon);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

Crawler.java 维护 URL 的队列和已访问的 URL 的集合。

public class Crawler implements Runnable {
    private ConcurrentLinkedQueue<URL> url_horizon;

    public void setUrl_horizon(ConcurrentLinkedQueue<URL> url_horizon) {
        this.url_horizon = url_horizon;
    }

    public ConcurrentLinkedQueue<URL> getUrl_horizon() {
        return url_horizon;
    }

    private Set<URL> url_visited;

    public void setUrl_visited(Set<URL> url_visited) {
        this.url_visited = url_visited;
    }

    public Set<URL> getUrl_visited() {
        return Collections.synchronizedSet(url_visited);
    }

    @Override
    public void run() {
        URL url = nextURLFromHorizon();
        scrap(url);
        addURLToVisited(url);

    }

    private URL nextURLFromHorizon() {
        if (!getUrl_horizon().isEmpty()) {
            URL url = url_horizon.poll();
            if (getUrl_visited().contains(url)) {
                return nextURLFromHorizon();
            }
            System.out.println("Horizon URL:" + url.getURL());
            return url;

        }
        return null;

    }

    private void scrap(URL url) {
        new Scrapper().scrap(url);
    }

    private void addURLToVisited(URL url) {
        System.out.println("Adding to visited set:" + url.getURL());
        getUrl_visited().add(url);
    }

}

URL.java 只是 class 和 private String url 并覆盖了 hashCode()equals().

此外,Scrapper.scrap() 到目前为止只有虚拟实现:

public void scrap(URL url){
        System.out.println("Done scrapping:"+url.getURL());
    }

WorkerManager 创建线程:

public class WorkerManager {
    private static final Integer WORKER_LIMIT = 10;
    private final ExecutorService executor = Executors.newFixedThreadPool(WORKER_LIMIT);

    public ExecutorService getExecutor() {
        return executor;
    }

    private static volatile WorkerManager instance = null;

    private WorkerManager() {
    }

    public static WorkerManager getInstance() {
        if (instance == null) {
            synchronized (WorkerManager.class) {
                if (instance == null) {
                    instance = new WorkerManager();
                }
            }
        }

        return instance;
    }

    public Future submitNewWorkerThread(Runnable run) {
        return executor.submit(run);
    }

}

问题

你最终创建的线程多于队列中 URL 的原因是因为执行器的线程中的 none 可能(事实上可能)开始直到你多次执行 while 循环。

无论何时使用线程,您都应该始终牢记,线程是独立调度的,并且 运行 按照它们自己的节奏进行调度,除非您显式同步它们。在这种情况下,线程可以在 submit() 调用后的任何时间启动,即使您似乎希望每个线程都在 [=11= 中的下一次迭代之前启动并经过 nextURLFromHorizon ]循环。

解决方案

考虑在将 Runnable 提交给执行器之前将 URL 从队列中取出。我还建议定义一次提交给执行程序的 CrawlerTask,而不是重复提交的 Crawler。在这样的设计中,你甚至不需要一个线程安全的容器来存放 URL 被废弃的内容。

class CrawlerTask extends Runnable {
   URL url;

   CrawlerTask(URL url) { this.url = url; }

   @Override
   public void run() {
     scrape(url);
     // add url to visited?
   }
}

class Crawler {
  ExecutorService executor;
  Queue urlHorizon;

  //...

  private static void startCrawling() {
    while (!urlHorizon.isEmpty()) {
      executor.submit(new CrawlerTask(urlHorizon.poll());
    }
    // ...
  }
}