使用工作人员可以向其添加任务的共享队列

Using a shared queue that workers can add tasks to

我是 python 的新手(我主要在 Java 中编写代码)。我有一个 python 脚本,它本质上是一个爬虫。它调用加载页面的 phantomjs,returns 它的来源,以及它在页面中找到的 url 列表。

我一直在尝试使用 Python 3 的 multiprocessing 模块来执行此操作,但我不知道如何使用工作人员也可以添加到的共享队列。我不断收到不可预测的结果。

我以前的方法使用 URL 的全局列表,我从中提取了一个块并使用 map_async 发送给工作人员。最后,我会收集所有返回的 URLs 并将它们附加到全局列表中。问题是每个 "chunk" 需要和最慢的工人一样长的时间。我正在尝试对其进行修改,以便每当 worker 完成时,它都可以拿起下一个 URL。但是,我认为我做的不对。这是我目前所拥有的:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

    for returned_url in returned_urls:
        urls.put(returned_url, block=True)

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    urls = manager.Queue()
    urls.append(<some-url>)

    pool = Pool()
    while True:
        url = urls.get(block=True)
        pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

如果有更好的方法,请告诉我。我正在抓取一个已知站点,最终终止条件是没有 URL 需要处理。但现在看来我将永远保留 运行。我不确定我是否会使用 queue.empty(),因为它确实说它不可靠。

我就是这样解决问题的。我最初使用 this answer but bj0 中发布的设计,提到它滥用了初始化函数。所以我决定使用 apply_async 以类似于我问题中发布的代码的方式来做到这一点。

由于我的工作人员修改了他们正在从中读取 URL 的队列(他们添加到其中),我想我可以简单地 运行 我的循环,就像这样:

while not urls.empty():
   pool.apply_async(worker, (urls.get(), urls))

我预计这会起作用,因为工作人员将添加到队列中,如果所有工作人员都忙,apply_async 会等待。 这没有像我预期的那样工作,循环提前终止。问题是不清楚如果所有工作人员都忙,apply_async 不会阻塞 。相反,它会将提交的任务排队,这意味着 urls 最终将变为空并且循环将终止。循环阻塞的唯一情况是当您尝试执行 urls.get() 时队列为空。此时,它将等待队列中有更多项目可用。但我仍然需要找出一种终止循环的方法。条件是循环应该在 none 的工作人员 return 新 URL 时终止。为此,我使用共享字典,如果进程没有 return 任何 URL,则将与进程名称关联的值设置为 0,否则设置为 1。我在循环的每次迭代中检查键的总和,如果它是 0,我就知道我完成了。

最终的基本结构是这样的:

def worker(url, url_queue, proc_user_urls_queue, proc_empty_urls_queue):

    returned_urls = phantomjs(url) # calls phantomjs and waits for output
    if len(returned_urls) > 0:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 1]
        )
    else:
        proc_empty_urls_queue.put(
            [multiprocessing.current_process().name, 0]
        )

    for returned_url in returned_urls:
        url_queue.put(returned_url)

def empty_url_tallier(proc_empty_urls_queue, proc_empty_urls_dict):
    while 1:
        # This may not be necessary. I don't know if this worker is run
        # by the same process every time. If not, it is possible that
        # the worker was assigned the task of fetching URLs, and returned
        # some. So let's make sure that we set its entry to zero anyway.
        # If this worker is run by the same process every time, then this
        # stuff is not necessary.
        id = multiprocessing.current_process().name
        proc_empty_urls_dict[id] = 0

        proc_empty_urls = proc_empty_urls_queue.get()
        if proc_empty_urls == "done": # poison pill
            break

        proc_id = proc_empty_urls[0]
        proc_empty_url = proc_empty_urls[1]
        proc_empty_urls_dict[proc_id] = proc_empty_url

manager = Manager()

urls = manager.Queue()
proc_empty_urls_queue = manager.Queue()
proc_empty_urls_dict = manager.dict()

pool = Pool(33)

pool.apply_async(writer, (proc_user_urls_queue,))
pool.apply_async(empty_url_tallier, (proc_empty_urls_queue, proc_empty_urls_dict))

# Run the first apply synchronously 
urls.put("<some-url>")
pool.apply(worker, (urls.get(), urls, proc_empty_urls_queue))
while sum(proc_empty_urls_dict.values()) > 0:
    pool.apply_async(worker, (urls.get(), urls, proc_empty_urls_queue))

proc_empty_urls_queue.put("done") # poison pill
pool.close()
pool.join()

以下是我可能会做的事情:

def worker(url, urls):
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
    returned_urls = phantomjs(url)
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")

      for returned_url in returned_urls:
          urls.put(returned_url, block=True)

      # signal finished processing this url
      urls.put('no-url')

    print("There are " + str(urls.qsize()) + " URLs in total.\n")

if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    pool = Pool()
    urls = manager.Queue()

    # start first url before entering loop
    counter = 1
    pool.apply_async(worker, (<some-url>, urls))

    while counter > 0:
        url = urls.get(block=True)
        if url == 'no-url':
            # a url has finished processing
            counter -= 1
        else:
            # a new url needs to be processed
            counter += 1
            pool.apply_async(worker, (url, urls))

    pool.close()
    pool.join()

每当 url 从队列中弹出时,增加计数器。将其视为 "currently processing url" 计数器。当 'no-url' 从队列中弹出时, "currently processing url" 已完成,因此递减计数器。只要计数器大于0,就有url个还没有处理完返回'no-url'

编辑

正如我在评论中所说(为阅读它的其他人放在这里),当使用 multiprocessing.Pool 时,与其将其视为单独的进程,不如将其视为单一结构每次获取数据时都会执行您的函数(尽可能同时执行)。这对于数据驱动的问题最有用,在这些问题中您不跟踪或关心单个工作进程,只处理正在处理的数据。