Python 并发 executor.map() 和 submit()

Python concurrent executor.map() and submit()

我正在学习如何与 executor.map()executor.submit() 并发使用。

我有一个包含 20 个 url 的列表,我想同时发送 20 个请求,问题是 .submit() returns 结果与给定列表的顺序不同开始。我读过 map() 可以满足我的需要,但我不知道如何用它编写代码。

下面的代码非常适合我。

问题:是否有map()的代码块等价于下面的代码,或者有什么排序方法可以按照给定列表的顺序对submit()的结果列表进行排序?

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

这是您现有代码的地图版本。请注意,回调现在接受一个元组作为参数。我在回调中添加了一个 try\except 所以结果不会抛出错误。结果按照输入列表排序。

from concurrent.futures import ThreadPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://www.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the url and contents
def load_url(tt):  # (url,timeout)
    url, timeout = tt
    try:
      with urllib.request.urlopen(url, timeout=timeout) as conn:
         return (url, conn.read())
    except Exception as ex:
        print("Error:", url, ex)
        return(url,"")  # error, return empty string

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(load_url, [(u,60) for u in URLS])  # pass url and timeout as tuple to callback
    executor.shutdown(wait=True) # wait for all complete
    print("Results:")
for r in results:  # ordered results, will throw exception here if not handled in callback
    print('   %r page is %d bytes' % (r[0], len(r[1])))

输出

Error: http://www.wsj.com/ HTTP Error 404: Not Found
Results:
   'http://www.foxnews.com/' page is 320028 bytes
   'http://www.cnn.com/' page is 1144916 bytes
   'http://www.wsj.com/' page is 0 bytes
   'http://www.bbc.co.uk/' page is 279418 bytes
   'http://some-made-up-domain.com/' page is 64668 bytes

不使用 map 方法,您可以使用 enumerate 构建 future_to_url 字典,不仅将 URL 作为值,还包括它们在列表中的索引。然后,您可以从调用 concurrent.futures.as_completed(future_to_url) 返回的 future 对象构建一个字典,索引作为键,这样您就可以在字典的长度上迭代索引以读取相同的字典排序为原始列表中的相应项目:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {
        executor.submit(load_url, url, 60): (i, url) for i, url in enumerate(URLS)
    }
    futures = {}
    for future in concurrent.futures.as_completed(future_to_url):
        i, url = future_to_url[future]
        futures[i] = url, future
    for i in range(len(futures)):
        url, future = futures[i]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))