多线程使用队列和 futures.ThreadPoolExecutor 使用 python3 中的列表之间的性能差异?

Performance difference between multithread using queue and futures.ThreadPoolExecutor using list in python3?

我正在尝试使用 python 多线程的各种方法,看看哪种方法符合我的要求。总的来说,我有一堆项目需要发送到 API。然后根据响应,一些项目将进入数据库并记录所有项目;例如,对于一个项目,如果 API returns 成功,该项目将只被记录,但当它 returns 失败时,该项目将被发送到数据库以供将来重试以及记录。

现在,根据 API 响应,我可以将成功项与失败项分开,并对所有失败项进行批量查询,这将提高我的数据库性能。为此,我在一个地方累积所有请求并尝试执行多线程 API 调用(因为这是一个 IO 绑定任务,我什至没有考虑多处理)但同时我需要保持跟踪哪个响应属于哪个请求。

谈到实际问题,我尝试了两种不同的方法,我认为它们可以提供几乎相同的性能,但结果却有很大的不同。

为了模拟 API 调用,我在我的本地主机上创建了一个 API,睡眠时间为 500 毫秒(平均处理时间)。请注意,我想在所有 API 调用完成后开始记录并插入数据库。

方法 - 1(使用 threading.Thread 和 queue.Queue())

import requests
import datetime
import threading
import queue

def target(data_q):
    while not data_q.empty():
        data_q.get()
        response = requests.get("https://postman-echo.com/get?foo1=bar1&foo2=bar2")
        print(response.status_code)
        data_q.task_done()

if __name__ == "__main__":
    data_q = queue.Queue()
    for i in range(0, 20):
        data_q.put(i)

    start = datetime.datetime.now()
    num_thread = 5
    for _ in range(num_thread):
        worker = threading.Thread(target=target(data_q))
        worker.start()

    data_q.join()

    print('Time taken multi-threading: '+str(datetime.datetime.now() - start))

我试了5次、10次、20次、30次,结果如下,
多线程所用时间:0:00:06.625710
多线程所用时间:0:00:13.326969
多线程所用时间:0:00:26.435534
多线程所用时间:0:00:40.737406

令我震惊的是,我在没有多线程的情况下尝试了同样的方法,并获得了几乎相同的性能。

然后在谷歌搜索之后,我被介绍到期货模块。

方法 - 2(使用 concurrent.futures)

def fetch_url(im_url):
    try:
        response = requests.get(im_url)
        return response.status_code
    except Exception as e:
        traceback.print_exc()

if __name__ == "__main__":
    data = []
    for i in range(0, 20):
        data.append(i)

    start = datetime.datetime.now()
    urls = ["https://postman-echo.com/get?foo1=bar1&foo2=bar2" + str(item) for item in data]
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        responses = executor.map(fetch_url, urls)
        for ret in responses:
            print(ret)
    print('Time taken future concurrent: ' + str(datetime.datetime.now() - start))

再进行5、10、20、30次,结果如下,
未来并发所用时间:0:00:01.276891
未来并发所用时间:0:00:02.635949
未来并发所用时间:0:00:05.073299
未来并发所用时间:0:00:07.296873

听说过asyncio,但还没用过。我还读到它提供了比 futures.ThreadPoolExecutor().

更好的性能

最后一个问题,如果两种方法都使用线程(或者我认为如此)那么为什么会有巨大的性能差距?我做错了什么吗?我环顾四周。未能找到令人满意的答案。对此有任何想法将不胜感激。感谢您回答问题。

[编辑 1]整个过程 运行 宁 python 3.8。 [编辑 2] 更新了代码示例和执行时间。现在他们应该 运行 在任何人的系统上。

documentation of ThreadPoolExecutor 详细解释了在未给出 max_workers 参数时启动了多少个线程,如您的示例所示。行为因确切的 Python 版本而异,但启动的任务数很可能超过 3,即使用队列的第一个版本中的线程数。您应该使用 futures.ThreadPoolExecutor(max_workers= 3) 来比较这两种方法。

对于更新的方法 - 1 我建议稍微修改一下 for 循环:

for _ in range(num_thread):
    target_to_run= target(data_q)
    print('target to run: {}'.format(target_to_run))
    worker = threading.Thread(target= target_to_run)
    worker.start()

输出将是这样的:

200
...
200
200
target to run: None
target to run: None
target to run: None
target to run: None
target to run: None
Time taken multi-threading: 0:00:10.846368

问题是 Thread 构造函数需要一个可调用对象或 None 作为它的 target。你没有给它一个可调用的,而是队列处理发生在主线程第一次调用 target(data_q) 时,并且启动了 5 个线程什么都不做,因为它们的 targetNone.