ForkJoinFramework 只使用两个工人
ForkJoinFramework only uses two workers
我有一个应用程序可以抓取大约 6000 个 urls.To 尽量减少这项工作 我创建了一个 RecursiveTask,它使用所有 URL 的 ConcurrentLinkedQueue 来抓取。它最多拆分 50 个,如果 que 为空,它会直接抓取它,但如果不是,它首先创建一个自己的新实例并分叉它,之后它抓取 50 个子集,然后它会加入分叉的任务。
现在我的问题来了,直到每个线程都完成了他的 50 个线程,所有四个线程都同时快速工作。但是在两个停止工作并等待加入之后,只有另外两个在工作并创建新的分支和爬网页面。
为了形象化这一点,我计算了 Thread 抓取 URL 的次数,并让 JavaFX gui 显示它。
我错了什么,所以 ForkJoinFramewok 只使用了我允许的四个线程中的两个?我能做些什么来改变它?
这是我的任务计算方法:
LOG.debug(
Thread.currentThread().getId() + " Starting new Task with "
+ urlsToCrawl.size() + " left."
);
final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
{
urlsToCrawlSubset.offer(urlsToCrawl.poll());
}
LOG.debug(
Thread.currentThread().getId() + " Crated a Subset with "
+ urlsToCrawlSubset.size() + "."
);
LOG.debug(
Thread.currentThread().getId()
+ " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
);
if (urlsToCrawl.isEmpty())
{
LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
crawlPage(urlsToCrawlSubset);
}
else
{
LOG.debug(
Thread.currentThread().getId()
+ " Creating a new Task and crawling the subset."
);
final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
otherTask.fork();
crawlPage(urlsToCrawlSubset);
taskResults.addAll(otherTask.join());
}
return taskResults;
这是我的图表的快照:
P.s。如果我允许最多 80 个线程,它将使用它们,直到每个线程都有 50 个 URL 被抓取,然后只使用两个。
如果您有兴趣,这里是完整的源代码:https://github.com/mediathekview/MServer/tree/feature/cleanup
我修好了它。我的错误是,我分裂了然后工作了一小部分而不是等待而不是将它分成两半,然后用剩下的另一半再次打电话给自己等
换句话说之前我是直接拆分工作,但是正确的做法是拆分到全部拆分完再开始工作。
这是我的代码现在的样子:
@Override
protected Set<T> compute()
{
if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask())
{
crawlPage(urlsToCrawl);
}
else
{
final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl));
final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl);
leftTask.fork();
taskResults.addAll(rightTask.compute());
taskResults.addAll(leftTask.join());
}
return taskResults;
}
private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue)
{
final int halfSize = aBaseQueue.size() / 2;
final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
for (int i = 0; i < halfSize; i++)
{
urlsToCrawlSubset.offer(aBaseQueue.poll());
}
return urlsToCrawlSubset;
}
我有一个应用程序可以抓取大约 6000 个 urls.To 尽量减少这项工作 我创建了一个 RecursiveTask,它使用所有 URL 的 ConcurrentLinkedQueue 来抓取。它最多拆分 50 个,如果 que 为空,它会直接抓取它,但如果不是,它首先创建一个自己的新实例并分叉它,之后它抓取 50 个子集,然后它会加入分叉的任务。
现在我的问题来了,直到每个线程都完成了他的 50 个线程,所有四个线程都同时快速工作。但是在两个停止工作并等待加入之后,只有另外两个在工作并创建新的分支和爬网页面。
为了形象化这一点,我计算了 Thread 抓取 URL 的次数,并让 JavaFX gui 显示它。
我错了什么,所以 ForkJoinFramewok 只使用了我允许的四个线程中的两个?我能做些什么来改变它?
这是我的任务计算方法:
LOG.debug(
Thread.currentThread().getId() + " Starting new Task with "
+ urlsToCrawl.size() + " left."
);
final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
{
urlsToCrawlSubset.offer(urlsToCrawl.poll());
}
LOG.debug(
Thread.currentThread().getId() + " Crated a Subset with "
+ urlsToCrawlSubset.size() + "."
);
LOG.debug(
Thread.currentThread().getId()
+ " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
);
if (urlsToCrawl.isEmpty())
{
LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
crawlPage(urlsToCrawlSubset);
}
else
{
LOG.debug(
Thread.currentThread().getId()
+ " Creating a new Task and crawling the subset."
);
final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
otherTask.fork();
crawlPage(urlsToCrawlSubset);
taskResults.addAll(otherTask.join());
}
return taskResults;
这是我的图表的快照:
P.s。如果我允许最多 80 个线程,它将使用它们,直到每个线程都有 50 个 URL 被抓取,然后只使用两个。
如果您有兴趣,这里是完整的源代码:https://github.com/mediathekview/MServer/tree/feature/cleanup
我修好了它。我的错误是,我分裂了然后工作了一小部分而不是等待而不是将它分成两半,然后用剩下的另一半再次打电话给自己等
换句话说之前我是直接拆分工作,但是正确的做法是拆分到全部拆分完再开始工作。
这是我的代码现在的样子:
@Override
protected Set<T> compute()
{
if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask())
{
crawlPage(urlsToCrawl);
}
else
{
final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl));
final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl);
leftTask.fork();
taskResults.addAll(rightTask.compute());
taskResults.addAll(leftTask.join());
}
return taskResults;
}
private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue)
{
final int halfSize = aBaseQueue.size() / 2;
final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
for (int i = 0; i < halfSize; i++)
{
urlsToCrawlSubset.offer(aBaseQueue.poll());
}
return urlsToCrawlSubset;
}