多线程使用队列和 futures.ThreadPoolExecutor 使用 python3 中的列表之间的性能差异?
Performance difference between multithread using queue and futures.ThreadPoolExecutor using list in python3?
我正在尝试使用 python 多线程的各种方法,看看哪种方法符合我的要求。总的来说,我有一堆项目需要发送到 API。然后根据响应,一些项目将进入数据库并记录所有项目;例如,对于一个项目,如果 API returns 成功,该项目将只被记录,但当它 returns 失败时,该项目将被发送到数据库以供将来重试以及记录。
现在,根据 API 响应,我可以将成功项与失败项分开,并对所有失败项进行批量查询,这将提高我的数据库性能。为此,我在一个地方累积所有请求并尝试执行多线程 API 调用(因为这是一个 IO 绑定任务,我什至没有考虑多处理)但同时我需要保持跟踪哪个响应属于哪个请求。
谈到实际问题,我尝试了两种不同的方法,我认为它们可以提供几乎相同的性能,但结果却有很大的不同。
为了模拟 API 调用,我在我的本地主机上创建了一个 API,睡眠时间为 500 毫秒(平均处理时间)。请注意,我想在所有 API 调用完成后开始记录并插入数据库。
方法 - 1(使用 threading.Thread 和 queue.Queue())
import requests
import datetime
import threading
import queue
def target(data_q):
while not data_q.empty():
data_q.get()
response = requests.get("https://postman-echo.com/get?foo1=bar1&foo2=bar2")
print(response.status_code)
data_q.task_done()
if __name__ == "__main__":
data_q = queue.Queue()
for i in range(0, 20):
data_q.put(i)
start = datetime.datetime.now()
num_thread = 5
for _ in range(num_thread):
worker = threading.Thread(target=target(data_q))
worker.start()
data_q.join()
print('Time taken multi-threading: '+str(datetime.datetime.now() - start))
我试了5次、10次、20次、30次,结果如下,
多线程所用时间:0:00:06.625710
多线程所用时间:0:00:13.326969
多线程所用时间:0:00:26.435534
多线程所用时间:0:00:40.737406
令我震惊的是,我在没有多线程的情况下尝试了同样的方法,并获得了几乎相同的性能。
然后在谷歌搜索之后,我被介绍到期货模块。
方法 - 2(使用 concurrent.futures)
def fetch_url(im_url):
try:
response = requests.get(im_url)
return response.status_code
except Exception as e:
traceback.print_exc()
if __name__ == "__main__":
data = []
for i in range(0, 20):
data.append(i)
start = datetime.datetime.now()
urls = ["https://postman-echo.com/get?foo1=bar1&foo2=bar2" + str(item) for item in data]
with futures.ThreadPoolExecutor(max_workers=5) as executor:
responses = executor.map(fetch_url, urls)
for ret in responses:
print(ret)
print('Time taken future concurrent: ' + str(datetime.datetime.now() - start))
再进行5、10、20、30次,结果如下,
未来并发所用时间:0:00:01.276891
未来并发所用时间:0:00:02.635949
未来并发所用时间:0:00:05.073299
未来并发所用时间:0:00:07.296873
听说过asyncio,但还没用过。我还读到它提供了比 futures.ThreadPoolExecutor().
更好的性能
最后一个问题,如果两种方法都使用线程(或者我认为如此)那么为什么会有巨大的性能差距?我做错了什么吗?我环顾四周。未能找到令人满意的答案。对此有任何想法将不胜感激。感谢您回答问题。
[编辑 1]整个过程 运行 宁 python 3.8。
[编辑 2] 更新了代码示例和执行时间。现在他们应该 运行 在任何人的系统上。
documentation of ThreadPoolExecutor
详细解释了在未给出 max_workers
参数时启动了多少个线程,如您的示例所示。行为因确切的 Python 版本而异,但启动的任务数很可能超过 3,即使用队列的第一个版本中的线程数。您应该使用 futures.ThreadPoolExecutor(max_workers= 3)
来比较这两种方法。
对于更新的方法 - 1 我建议稍微修改一下 for 循环:
for _ in range(num_thread):
target_to_run= target(data_q)
print('target to run: {}'.format(target_to_run))
worker = threading.Thread(target= target_to_run)
worker.start()
输出将是这样的:
200
...
200
200
target to run: None
target to run: None
target to run: None
target to run: None
target to run: None
Time taken multi-threading: 0:00:10.846368
问题是 Thread
构造函数需要一个可调用对象或 None
作为它的 target
。你没有给它一个可调用的,而是队列处理发生在主线程第一次调用 target(data_q)
时,并且启动了 5 个线程什么都不做,因为它们的 target
是 None
.
我正在尝试使用 python 多线程的各种方法,看看哪种方法符合我的要求。总的来说,我有一堆项目需要发送到 API。然后根据响应,一些项目将进入数据库并记录所有项目;例如,对于一个项目,如果 API returns 成功,该项目将只被记录,但当它 returns 失败时,该项目将被发送到数据库以供将来重试以及记录。
现在,根据 API 响应,我可以将成功项与失败项分开,并对所有失败项进行批量查询,这将提高我的数据库性能。为此,我在一个地方累积所有请求并尝试执行多线程 API 调用(因为这是一个 IO 绑定任务,我什至没有考虑多处理)但同时我需要保持跟踪哪个响应属于哪个请求。
谈到实际问题,我尝试了两种不同的方法,我认为它们可以提供几乎相同的性能,但结果却有很大的不同。
为了模拟 API 调用,我在我的本地主机上创建了一个 API,睡眠时间为 500 毫秒(平均处理时间)。请注意,我想在所有 API 调用完成后开始记录并插入数据库。
方法 - 1(使用 threading.Thread 和 queue.Queue())
import requests
import datetime
import threading
import queue
def target(data_q):
while not data_q.empty():
data_q.get()
response = requests.get("https://postman-echo.com/get?foo1=bar1&foo2=bar2")
print(response.status_code)
data_q.task_done()
if __name__ == "__main__":
data_q = queue.Queue()
for i in range(0, 20):
data_q.put(i)
start = datetime.datetime.now()
num_thread = 5
for _ in range(num_thread):
worker = threading.Thread(target=target(data_q))
worker.start()
data_q.join()
print('Time taken multi-threading: '+str(datetime.datetime.now() - start))
我试了5次、10次、20次、30次,结果如下,
多线程所用时间:0:00:06.625710
多线程所用时间:0:00:13.326969
多线程所用时间:0:00:26.435534
多线程所用时间:0:00:40.737406
令我震惊的是,我在没有多线程的情况下尝试了同样的方法,并获得了几乎相同的性能。
然后在谷歌搜索之后,我被介绍到期货模块。
方法 - 2(使用 concurrent.futures)
def fetch_url(im_url):
try:
response = requests.get(im_url)
return response.status_code
except Exception as e:
traceback.print_exc()
if __name__ == "__main__":
data = []
for i in range(0, 20):
data.append(i)
start = datetime.datetime.now()
urls = ["https://postman-echo.com/get?foo1=bar1&foo2=bar2" + str(item) for item in data]
with futures.ThreadPoolExecutor(max_workers=5) as executor:
responses = executor.map(fetch_url, urls)
for ret in responses:
print(ret)
print('Time taken future concurrent: ' + str(datetime.datetime.now() - start))
再进行5、10、20、30次,结果如下,
未来并发所用时间:0:00:01.276891
未来并发所用时间:0:00:02.635949
未来并发所用时间:0:00:05.073299
未来并发所用时间:0:00:07.296873
听说过asyncio,但还没用过。我还读到它提供了比 futures.ThreadPoolExecutor().
更好的性能最后一个问题,如果两种方法都使用线程(或者我认为如此)那么为什么会有巨大的性能差距?我做错了什么吗?我环顾四周。未能找到令人满意的答案。对此有任何想法将不胜感激。感谢您回答问题。
[编辑 1]整个过程 运行 宁 python 3.8。 [编辑 2] 更新了代码示例和执行时间。现在他们应该 运行 在任何人的系统上。
documentation of ThreadPoolExecutor
详细解释了在未给出 max_workers
参数时启动了多少个线程,如您的示例所示。行为因确切的 Python 版本而异,但启动的任务数很可能超过 3,即使用队列的第一个版本中的线程数。您应该使用 futures.ThreadPoolExecutor(max_workers= 3)
来比较这两种方法。
对于更新的方法 - 1 我建议稍微修改一下 for 循环:
for _ in range(num_thread):
target_to_run= target(data_q)
print('target to run: {}'.format(target_to_run))
worker = threading.Thread(target= target_to_run)
worker.start()
输出将是这样的:
200
...
200
200
target to run: None
target to run: None
target to run: None
target to run: None
target to run: None
Time taken multi-threading: 0:00:10.846368
问题是 Thread
构造函数需要一个可调用对象或 None
作为它的 target
。你没有给它一个可调用的,而是队列处理发生在主线程第一次调用 target(data_q)
时,并且启动了 5 个线程什么都不做,因为它们的 target
是 None
.