ForkJoinFramework 只使用两个工人

ForkJoinFramework only uses two workers

我有一个应用程序可以抓取大约 6000 个 urls.To 尽量减少这项工作 我创建了一个 RecursiveTask,它使用所有 URL 的 ConcurrentLinkedQueue 来抓取。它最多拆分 50 个,如果 que 为空,它会直接抓取它,但如果不是,它首先创建一个自己的新实例并分叉它,之后它抓取 50 个子集,然后它会加入分叉的任务。

现在我的问题来了,直到每个线程都完成了他的 50 个线程,所有四个线程都同时快速工作。但是在两个停止工作并等待加入之后,只有另外两个在工作并创建新的分支和爬网页面。

为了形象化这一点,我计算了 Thread 抓取 URL 的次数,并让 JavaFX gui 显示它。

我错了什么,所以 ForkJoinFramewok 只使用了我允许的四个线程中的两个?我能做些什么来改变它?

这是我的任务计算方法:

    LOG.debug(
       Thread.currentThread().getId() + " Starting new Task with " 
          + urlsToCrawl.size() + " left."
    );
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
    for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
    {
        urlsToCrawlSubset.offer(urlsToCrawl.poll());
    }
    LOG.debug(
       Thread.currentThread().getId() + " Crated a Subset with " 
       + urlsToCrawlSubset.size() + "."
    );
    LOG.debug(
       Thread.currentThread().getId() 
       + " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
    );

    if (urlsToCrawl.isEmpty())
    {
        LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
        crawlPage(urlsToCrawlSubset);
    }
    else
    {
        LOG.debug(
           Thread.currentThread().getId() 
              + " Creating a new Task and crawling the subset."
        );
        final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
        otherTask.fork();
        crawlPage(urlsToCrawlSubset);
        taskResults.addAll(otherTask.join());
    }
    return taskResults;

这是我的图表的快照:

P.s。如果我允许最多 80 个线程,它将使用它们,直到每个线程都有 50 个 URL 被抓取,然后只使用两个。

如果您有兴趣,这里是完整的源代码:https://github.com/mediathekview/MServer/tree/feature/cleanup

我修好了它。我的错误是,我分裂了然后工作了一小部分而不是等待而不是将它分成两半,然后用剩下的另一半再次打电话给自己等

换句话说之前我是直接拆分工作,但是正确的做法是拆分到全部拆分完再开始工作。

这是我的代码现在的样子:

@Override
protected Set<T> compute()
{
    if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask())
    {
        crawlPage(urlsToCrawl);
    }
    else
    {
        final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl));
        final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl);
        leftTask.fork();
        taskResults.addAll(rightTask.compute());
        taskResults.addAll(leftTask.join());
    }
    return taskResults;
}

private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue)
{
    final int halfSize = aBaseQueue.size() / 2;
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
    for (int i = 0; i < halfSize; i++)
    {
        urlsToCrawlSubset.offer(aBaseQueue.poll());
    }
    return urlsToCrawlSubset;
}