使用工作人员可以向其添加任务的共享队列
Using a shared queue that workers can add tasks to
我是 python 的新手(我主要在 Java 中编写代码)。我有一个 python 脚本,它本质上是一个爬虫。它调用加载页面的 phantomjs,returns 它的来源,以及它在页面中找到的 url 列表。
我一直在尝试使用 Python 3 的 multiprocessing
模块来执行此操作,但我不知道如何使用工作人员也可以添加到的共享队列。我不断收到不可预测的结果。
我以前的方法使用 URL 的全局列表,我从中提取了一个块并使用 map_async
发送给工作人员。最后,我会收集所有返回的 URLs 并将它们附加到全局列表中。问题是每个 "chunk" 需要和最慢的工人一样长的时间。我正在尝试对其进行修改,以便每当 worker 完成时,它都可以拿起下一个 URL。但是,我认为我做的不对。这是我目前所拥有的:
def worker(url, urls):
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
returned_urls = phantomjs(url)
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")
for returned_url in returned_urls:
urls.put(returned_url, block=True)
print("There are " + str(urls.qsize()) + " URLs in total.\n")
if __name__ == '__main__':
manager = multiprocessing.Manager()
urls = manager.Queue()
urls.append(<some-url>)
pool = Pool()
while True:
url = urls.get(block=True)
pool.apply_async(worker, (url, urls))
pool.close()
pool.join()
如果有更好的方法,请告诉我。我正在抓取一个已知站点,最终终止条件是没有 URL 需要处理。但现在看来我将永远保留 运行。我不确定我是否会使用 queue.empty()
,因为它确实说它不可靠。
我就是这样解决问题的。我最初使用 this answer but bj0 中发布的设计,提到它滥用了初始化函数。所以我决定使用 apply_async
以类似于我问题中发布的代码的方式来做到这一点。
由于我的工作人员修改了他们正在从中读取 URL 的队列(他们添加到其中),我想我可以简单地 运行 我的循环,就像这样:
while not urls.empty():
pool.apply_async(worker, (urls.get(), urls))
我预计这会起作用,因为工作人员将添加到队列中,如果所有工作人员都忙,apply_async
会等待。 这没有像我预期的那样工作,循环提前终止。问题是不清楚如果所有工作人员都忙,apply_async
不会阻塞 。相反,它会将提交的任务排队,这意味着 urls
最终将变为空并且循环将终止。循环阻塞的唯一情况是当您尝试执行 urls.get()
时队列为空。此时,它将等待队列中有更多项目可用。但我仍然需要找出一种终止循环的方法。条件是循环应该在 none 的工作人员 return 新 URL 时终止。为此,我使用共享字典,如果进程没有 return 任何 URL,则将与进程名称关联的值设置为 0,否则设置为 1。我在循环的每次迭代中检查键的总和,如果它是 0,我就知道我完成了。
最终的基本结构是这样的:
def worker(url, url_queue, proc_user_urls_queue, proc_empty_urls_queue):
returned_urls = phantomjs(url) # calls phantomjs and waits for output
if len(returned_urls) > 0:
proc_empty_urls_queue.put(
[multiprocessing.current_process().name, 1]
)
else:
proc_empty_urls_queue.put(
[multiprocessing.current_process().name, 0]
)
for returned_url in returned_urls:
url_queue.put(returned_url)
def empty_url_tallier(proc_empty_urls_queue, proc_empty_urls_dict):
while 1:
# This may not be necessary. I don't know if this worker is run
# by the same process every time. If not, it is possible that
# the worker was assigned the task of fetching URLs, and returned
# some. So let's make sure that we set its entry to zero anyway.
# If this worker is run by the same process every time, then this
# stuff is not necessary.
id = multiprocessing.current_process().name
proc_empty_urls_dict[id] = 0
proc_empty_urls = proc_empty_urls_queue.get()
if proc_empty_urls == "done": # poison pill
break
proc_id = proc_empty_urls[0]
proc_empty_url = proc_empty_urls[1]
proc_empty_urls_dict[proc_id] = proc_empty_url
manager = Manager()
urls = manager.Queue()
proc_empty_urls_queue = manager.Queue()
proc_empty_urls_dict = manager.dict()
pool = Pool(33)
pool.apply_async(writer, (proc_user_urls_queue,))
pool.apply_async(empty_url_tallier, (proc_empty_urls_queue, proc_empty_urls_dict))
# Run the first apply synchronously
urls.put("<some-url>")
pool.apply(worker, (urls.get(), urls, proc_empty_urls_queue))
while sum(proc_empty_urls_dict.values()) > 0:
pool.apply_async(worker, (urls.get(), urls, proc_empty_urls_queue))
proc_empty_urls_queue.put("done") # poison pill
pool.close()
pool.join()
以下是我可能会做的事情:
def worker(url, urls):
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
returned_urls = phantomjs(url)
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")
for returned_url in returned_urls:
urls.put(returned_url, block=True)
# signal finished processing this url
urls.put('no-url')
print("There are " + str(urls.qsize()) + " URLs in total.\n")
if __name__ == '__main__':
manager = multiprocessing.Manager()
pool = Pool()
urls = manager.Queue()
# start first url before entering loop
counter = 1
pool.apply_async(worker, (<some-url>, urls))
while counter > 0:
url = urls.get(block=True)
if url == 'no-url':
# a url has finished processing
counter -= 1
else:
# a new url needs to be processed
counter += 1
pool.apply_async(worker, (url, urls))
pool.close()
pool.join()
每当 url 从队列中弹出时,增加计数器。将其视为 "currently processing url" 计数器。当 'no-url' 从队列中弹出时, "currently processing url" 已完成,因此递减计数器。只要计数器大于0,就有url个还没有处理完返回'no-url'
编辑
正如我在评论中所说(为阅读它的其他人放在这里),当使用 multiprocessing.Pool
时,与其将其视为单独的进程,不如将其视为单一结构每次获取数据时都会执行您的函数(尽可能同时执行)。这对于数据驱动的问题最有用,在这些问题中您不跟踪或关心单个工作进程,只处理正在处理的数据。
我是 python 的新手(我主要在 Java 中编写代码)。我有一个 python 脚本,它本质上是一个爬虫。它调用加载页面的 phantomjs,returns 它的来源,以及它在页面中找到的 url 列表。
我一直在尝试使用 Python 3 的 multiprocessing
模块来执行此操作,但我不知道如何使用工作人员也可以添加到的共享队列。我不断收到不可预测的结果。
我以前的方法使用 URL 的全局列表,我从中提取了一个块并使用 map_async
发送给工作人员。最后,我会收集所有返回的 URLs 并将它们附加到全局列表中。问题是每个 "chunk" 需要和最慢的工人一样长的时间。我正在尝试对其进行修改,以便每当 worker 完成时,它都可以拿起下一个 URL。但是,我认为我做的不对。这是我目前所拥有的:
def worker(url, urls):
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
returned_urls = phantomjs(url)
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")
for returned_url in returned_urls:
urls.put(returned_url, block=True)
print("There are " + str(urls.qsize()) + " URLs in total.\n")
if __name__ == '__main__':
manager = multiprocessing.Manager()
urls = manager.Queue()
urls.append(<some-url>)
pool = Pool()
while True:
url = urls.get(block=True)
pool.apply_async(worker, (url, urls))
pool.close()
pool.join()
如果有更好的方法,请告诉我。我正在抓取一个已知站点,最终终止条件是没有 URL 需要处理。但现在看来我将永远保留 运行。我不确定我是否会使用 queue.empty()
,因为它确实说它不可靠。
我就是这样解决问题的。我最初使用 this answer but bj0 中发布的设计,提到它滥用了初始化函数。所以我决定使用 apply_async
以类似于我问题中发布的代码的方式来做到这一点。
由于我的工作人员修改了他们正在从中读取 URL 的队列(他们添加到其中),我想我可以简单地 运行 我的循环,就像这样:
while not urls.empty():
pool.apply_async(worker, (urls.get(), urls))
我预计这会起作用,因为工作人员将添加到队列中,如果所有工作人员都忙,apply_async
会等待。 这没有像我预期的那样工作,循环提前终止。问题是不清楚如果所有工作人员都忙,apply_async
不会阻塞 。相反,它会将提交的任务排队,这意味着 urls
最终将变为空并且循环将终止。循环阻塞的唯一情况是当您尝试执行 urls.get()
时队列为空。此时,它将等待队列中有更多项目可用。但我仍然需要找出一种终止循环的方法。条件是循环应该在 none 的工作人员 return 新 URL 时终止。为此,我使用共享字典,如果进程没有 return 任何 URL,则将与进程名称关联的值设置为 0,否则设置为 1。我在循环的每次迭代中检查键的总和,如果它是 0,我就知道我完成了。
最终的基本结构是这样的:
def worker(url, url_queue, proc_user_urls_queue, proc_empty_urls_queue):
returned_urls = phantomjs(url) # calls phantomjs and waits for output
if len(returned_urls) > 0:
proc_empty_urls_queue.put(
[multiprocessing.current_process().name, 1]
)
else:
proc_empty_urls_queue.put(
[multiprocessing.current_process().name, 0]
)
for returned_url in returned_urls:
url_queue.put(returned_url)
def empty_url_tallier(proc_empty_urls_queue, proc_empty_urls_dict):
while 1:
# This may not be necessary. I don't know if this worker is run
# by the same process every time. If not, it is possible that
# the worker was assigned the task of fetching URLs, and returned
# some. So let's make sure that we set its entry to zero anyway.
# If this worker is run by the same process every time, then this
# stuff is not necessary.
id = multiprocessing.current_process().name
proc_empty_urls_dict[id] = 0
proc_empty_urls = proc_empty_urls_queue.get()
if proc_empty_urls == "done": # poison pill
break
proc_id = proc_empty_urls[0]
proc_empty_url = proc_empty_urls[1]
proc_empty_urls_dict[proc_id] = proc_empty_url
manager = Manager()
urls = manager.Queue()
proc_empty_urls_queue = manager.Queue()
proc_empty_urls_dict = manager.dict()
pool = Pool(33)
pool.apply_async(writer, (proc_user_urls_queue,))
pool.apply_async(empty_url_tallier, (proc_empty_urls_queue, proc_empty_urls_dict))
# Run the first apply synchronously
urls.put("<some-url>")
pool.apply(worker, (urls.get(), urls, proc_empty_urls_queue))
while sum(proc_empty_urls_dict.values()) > 0:
pool.apply_async(worker, (urls.get(), urls, proc_empty_urls_queue))
proc_empty_urls_queue.put("done") # poison pill
pool.close()
pool.join()
以下是我可能会做的事情:
def worker(url, urls):
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url)
returned_urls = phantomjs(url)
print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs")
for returned_url in returned_urls:
urls.put(returned_url, block=True)
# signal finished processing this url
urls.put('no-url')
print("There are " + str(urls.qsize()) + " URLs in total.\n")
if __name__ == '__main__':
manager = multiprocessing.Manager()
pool = Pool()
urls = manager.Queue()
# start first url before entering loop
counter = 1
pool.apply_async(worker, (<some-url>, urls))
while counter > 0:
url = urls.get(block=True)
if url == 'no-url':
# a url has finished processing
counter -= 1
else:
# a new url needs to be processed
counter += 1
pool.apply_async(worker, (url, urls))
pool.close()
pool.join()
每当 url 从队列中弹出时,增加计数器。将其视为 "currently processing url" 计数器。当 'no-url' 从队列中弹出时, "currently processing url" 已完成,因此递减计数器。只要计数器大于0,就有url个还没有处理完返回'no-url'
编辑
正如我在评论中所说(为阅读它的其他人放在这里),当使用 multiprocessing.Pool
时,与其将其视为单独的进程,不如将其视为单一结构每次获取数据时都会执行您的函数(尽可能同时执行)。这对于数据驱动的问题最有用,在这些问题中您不跟踪或关心单个工作进程,只处理正在处理的数据。