如何检查 ConcurrentLinkedQueue 的 size() 或 isEmpty()
How to check the size() or isEmpty() for ConcurrentLinkedQueue
我正在尝试为 Java 中的 Web 爬虫制作一个简单的结构原型。到目前为止,原型只是试图执行以下操作:
- 用起始列表 URLs
初始化队列
- 从Queue中取出一个URL并提交到一个新的Thread
- 做一些工作,然后将 URL 添加到一组已访问的 URLs
对于开始 URLs 的队列,我使用 ConcurrentLinkedQueue
进行同步。
为了生成新线程,我正在使用 ExecutorService
.
但是在创建新线程时,应用程序需要检查 ConcurrentLinkedQueue
是否为空。我尝试使用:
.size()
.isEmpty()
但两者似乎都没有返回 ConcurrentLinkedQueue
的真实状态。
问题出在下面的块中:
while (!crawler.getUrl_horizon().isEmpty()) {
workers.submitNewWorkerThread(crawler);
}
因此,ExecutorService 在其限制内创建所有线程,即使输入只有 2 URLs。
是不是这里实现多线程的方式有问题?如果不是,检查 ConcurrentLinkedQueue 状态的更好方法是什么?
正在为应用程序启动 class:
public class CrawlerApp {
private static Crawler crawler;
public static void main(String[] args) {
crawler = = new Crawler();
initializeApp();
startCrawling();
}
private static void startCrawling() {
crawler.setUrl_visited(new HashSet<URL>());
WorkerManager workers = WorkerManager.getInstance();
while (!crawler.getUrl_horizon().isEmpty()) {
workers.submitNewWorkerThread(crawler);
}
try {
workers.getExecutor().shutdown();
workers.getExecutor().awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void initializeApp() {
Properties config = new Properties();
try {
config.load(CrawlerApp.class.getClassLoader().getResourceAsStream("url-horizon.properties"));
String[] horizon = config.getProperty("urls").split(",");
ConcurrentLinkedQueue<URL> url_horizon = new ConcurrentLinkedQueue<>();
for (String link : horizon) {
URL url = new URL();
url.setURL(link);
url_horizon.add(url);
}
crawler.setUrl_horizon(url_horizon);
} catch (IOException e) {
e.printStackTrace();
}
}
}
Crawler.java
维护 URL 的队列和已访问的 URL 的集合。
public class Crawler implements Runnable {
private ConcurrentLinkedQueue<URL> url_horizon;
public void setUrl_horizon(ConcurrentLinkedQueue<URL> url_horizon) {
this.url_horizon = url_horizon;
}
public ConcurrentLinkedQueue<URL> getUrl_horizon() {
return url_horizon;
}
private Set<URL> url_visited;
public void setUrl_visited(Set<URL> url_visited) {
this.url_visited = url_visited;
}
public Set<URL> getUrl_visited() {
return Collections.synchronizedSet(url_visited);
}
@Override
public void run() {
URL url = nextURLFromHorizon();
scrap(url);
addURLToVisited(url);
}
private URL nextURLFromHorizon() {
if (!getUrl_horizon().isEmpty()) {
URL url = url_horizon.poll();
if (getUrl_visited().contains(url)) {
return nextURLFromHorizon();
}
System.out.println("Horizon URL:" + url.getURL());
return url;
}
return null;
}
private void scrap(URL url) {
new Scrapper().scrap(url);
}
private void addURLToVisited(URL url) {
System.out.println("Adding to visited set:" + url.getURL());
getUrl_visited().add(url);
}
}
URL.java
只是 class 和 private String url
并覆盖了 hashCode()
和 equals()
.
此外,Scrapper.scrap()
到目前为止只有虚拟实现:
public void scrap(URL url){
System.out.println("Done scrapping:"+url.getURL());
}
WorkerManager
创建线程:
public class WorkerManager {
private static final Integer WORKER_LIMIT = 10;
private final ExecutorService executor = Executors.newFixedThreadPool(WORKER_LIMIT);
public ExecutorService getExecutor() {
return executor;
}
private static volatile WorkerManager instance = null;
private WorkerManager() {
}
public static WorkerManager getInstance() {
if (instance == null) {
synchronized (WorkerManager.class) {
if (instance == null) {
instance = new WorkerManager();
}
}
}
return instance;
}
public Future submitNewWorkerThread(Runnable run) {
return executor.submit(run);
}
}
问题
你最终创建的线程多于队列中 URL 的原因是因为执行器的线程中的 none 可能(事实上可能)开始直到你多次执行 while
循环。
无论何时使用线程,您都应该始终牢记,线程是独立调度的,并且 运行 按照它们自己的节奏进行调度,除非您显式同步它们。在这种情况下,线程可以在 submit()
调用后的任何时间启动,即使您似乎希望每个线程都在 [=11= 中的下一次迭代之前启动并经过 nextURLFromHorizon
]循环。
解决方案
考虑在将 Runnable
提交给执行器之前将 URL 从队列中取出。我还建议定义一次提交给执行程序的 CrawlerTask
,而不是重复提交的 Crawler
。在这样的设计中,你甚至不需要一个线程安全的容器来存放 URL 被废弃的内容。
class CrawlerTask extends Runnable {
URL url;
CrawlerTask(URL url) { this.url = url; }
@Override
public void run() {
scrape(url);
// add url to visited?
}
}
class Crawler {
ExecutorService executor;
Queue urlHorizon;
//...
private static void startCrawling() {
while (!urlHorizon.isEmpty()) {
executor.submit(new CrawlerTask(urlHorizon.poll());
}
// ...
}
}
我正在尝试为 Java 中的 Web 爬虫制作一个简单的结构原型。到目前为止,原型只是试图执行以下操作:
- 用起始列表 URLs 初始化队列
- 从Queue中取出一个URL并提交到一个新的Thread
- 做一些工作,然后将 URL 添加到一组已访问的 URLs
对于开始 URLs 的队列,我使用 ConcurrentLinkedQueue
进行同步。
为了生成新线程,我正在使用 ExecutorService
.
但是在创建新线程时,应用程序需要检查 ConcurrentLinkedQueue
是否为空。我尝试使用:
.size()
.isEmpty()
但两者似乎都没有返回 ConcurrentLinkedQueue
的真实状态。
问题出在下面的块中:
while (!crawler.getUrl_horizon().isEmpty()) {
workers.submitNewWorkerThread(crawler);
}
因此,ExecutorService 在其限制内创建所有线程,即使输入只有 2 URLs。
是不是这里实现多线程的方式有问题?如果不是,检查 ConcurrentLinkedQueue 状态的更好方法是什么?
正在为应用程序启动 class:
public class CrawlerApp {
private static Crawler crawler;
public static void main(String[] args) {
crawler = = new Crawler();
initializeApp();
startCrawling();
}
private static void startCrawling() {
crawler.setUrl_visited(new HashSet<URL>());
WorkerManager workers = WorkerManager.getInstance();
while (!crawler.getUrl_horizon().isEmpty()) {
workers.submitNewWorkerThread(crawler);
}
try {
workers.getExecutor().shutdown();
workers.getExecutor().awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void initializeApp() {
Properties config = new Properties();
try {
config.load(CrawlerApp.class.getClassLoader().getResourceAsStream("url-horizon.properties"));
String[] horizon = config.getProperty("urls").split(",");
ConcurrentLinkedQueue<URL> url_horizon = new ConcurrentLinkedQueue<>();
for (String link : horizon) {
URL url = new URL();
url.setURL(link);
url_horizon.add(url);
}
crawler.setUrl_horizon(url_horizon);
} catch (IOException e) {
e.printStackTrace();
}
}
}
Crawler.java
维护 URL 的队列和已访问的 URL 的集合。
public class Crawler implements Runnable {
private ConcurrentLinkedQueue<URL> url_horizon;
public void setUrl_horizon(ConcurrentLinkedQueue<URL> url_horizon) {
this.url_horizon = url_horizon;
}
public ConcurrentLinkedQueue<URL> getUrl_horizon() {
return url_horizon;
}
private Set<URL> url_visited;
public void setUrl_visited(Set<URL> url_visited) {
this.url_visited = url_visited;
}
public Set<URL> getUrl_visited() {
return Collections.synchronizedSet(url_visited);
}
@Override
public void run() {
URL url = nextURLFromHorizon();
scrap(url);
addURLToVisited(url);
}
private URL nextURLFromHorizon() {
if (!getUrl_horizon().isEmpty()) {
URL url = url_horizon.poll();
if (getUrl_visited().contains(url)) {
return nextURLFromHorizon();
}
System.out.println("Horizon URL:" + url.getURL());
return url;
}
return null;
}
private void scrap(URL url) {
new Scrapper().scrap(url);
}
private void addURLToVisited(URL url) {
System.out.println("Adding to visited set:" + url.getURL());
getUrl_visited().add(url);
}
}
URL.java
只是 class 和 private String url
并覆盖了 hashCode()
和 equals()
.
此外,Scrapper.scrap()
到目前为止只有虚拟实现:
public void scrap(URL url){
System.out.println("Done scrapping:"+url.getURL());
}
WorkerManager
创建线程:
public class WorkerManager {
private static final Integer WORKER_LIMIT = 10;
private final ExecutorService executor = Executors.newFixedThreadPool(WORKER_LIMIT);
public ExecutorService getExecutor() {
return executor;
}
private static volatile WorkerManager instance = null;
private WorkerManager() {
}
public static WorkerManager getInstance() {
if (instance == null) {
synchronized (WorkerManager.class) {
if (instance == null) {
instance = new WorkerManager();
}
}
}
return instance;
}
public Future submitNewWorkerThread(Runnable run) {
return executor.submit(run);
}
}
问题
你最终创建的线程多于队列中 URL 的原因是因为执行器的线程中的 none 可能(事实上可能)开始直到你多次执行 while
循环。
无论何时使用线程,您都应该始终牢记,线程是独立调度的,并且 运行 按照它们自己的节奏进行调度,除非您显式同步它们。在这种情况下,线程可以在 submit()
调用后的任何时间启动,即使您似乎希望每个线程都在 [=11= 中的下一次迭代之前启动并经过 nextURLFromHorizon
]循环。
解决方案
考虑在将 Runnable
提交给执行器之前将 URL 从队列中取出。我还建议定义一次提交给执行程序的 CrawlerTask
,而不是重复提交的 Crawler
。在这样的设计中,你甚至不需要一个线程安全的容器来存放 URL 被废弃的内容。
class CrawlerTask extends Runnable {
URL url;
CrawlerTask(URL url) { this.url = url; }
@Override
public void run() {
scrape(url);
// add url to visited?
}
}
class Crawler {
ExecutorService executor;
Queue urlHorizon;
//...
private static void startCrawling() {
while (!urlHorizon.isEmpty()) {
executor.submit(new CrawlerTask(urlHorizon.poll());
}
// ...
}
}