Python - IO 的多处理和线程
Python - Multiprocessing and Threads for IO
你好,我想举一个例子,将使用线程的非异步代码转换为同时使用线程的代码。
我的目标:
产生 4 个进程,并且每个进程同时产生 10 个线程。
import requests
import multiprocessing
from concurrent import futures
def poll_data_1(data):
response = requests.get('https://breadcrumbscollector.tech/feed/')
print(f'Got data of length: {len(response.content)} in just {response.elapsed}')
def thread_set(data):
max_workers = 10
concurrent = futures.ThreadPoolExecutor(max_workers)
with concurrent as ex:
ex.map(poll_data_1, data)
data =range(40)
data1 =[]
for l in data:
data1.append([l])
# Mutliprocessing
with multiprocessing.Pool(processes=4, maxtasksperchild=1) as pool:
pool.imap_unordered(thread_set, data1)
pool.close()
pool.join()
所以这段代码 "Works" 但看起来它一次只打开 1 个进程。所以10个线程会运行,比10个多。我的目标是一次 运行 所有 40 个线程。
我尝试这样做的原因是我的实际应用程序正在尝试执行 8,000-14,000 个 IO 绑定请求。所以线程并没有扩展那么高。如果我可以说让我的真实服务器打开 process=to CPU,并且每个进程产生 1000 个线程,我认为它会更好。
或者我超级错了...谢谢!
您需要一个循环来阻止主线程关闭池,直到所有作业完成。
替换
pool.imap_unordered(thread_set, data1)
与
for result in pool.imap_unordered(thread_set, data1):
pass
然后 运行 又是你的例子。
另外你不需要:
pool.close()
pool.join()
因为 with 语句会自动执行此操作。
你好,我想举一个例子,将使用线程的非异步代码转换为同时使用线程的代码。
我的目标: 产生 4 个进程,并且每个进程同时产生 10 个线程。
import requests
import multiprocessing
from concurrent import futures
def poll_data_1(data):
response = requests.get('https://breadcrumbscollector.tech/feed/')
print(f'Got data of length: {len(response.content)} in just {response.elapsed}')
def thread_set(data):
max_workers = 10
concurrent = futures.ThreadPoolExecutor(max_workers)
with concurrent as ex:
ex.map(poll_data_1, data)
data =range(40)
data1 =[]
for l in data:
data1.append([l])
# Mutliprocessing
with multiprocessing.Pool(processes=4, maxtasksperchild=1) as pool:
pool.imap_unordered(thread_set, data1)
pool.close()
pool.join()
所以这段代码 "Works" 但看起来它一次只打开 1 个进程。所以10个线程会运行,比10个多。我的目标是一次 运行 所有 40 个线程。
我尝试这样做的原因是我的实际应用程序正在尝试执行 8,000-14,000 个 IO 绑定请求。所以线程并没有扩展那么高。如果我可以说让我的真实服务器打开 process=to CPU,并且每个进程产生 1000 个线程,我认为它会更好。
或者我超级错了...谢谢!
您需要一个循环来阻止主线程关闭池,直到所有作业完成。
替换
pool.imap_unordered(thread_set, data1)
与
for result in pool.imap_unordered(thread_set, data1):
pass
然后 运行 又是你的例子。
另外你不需要:
pool.close()
pool.join()
因为 with 语句会自动执行此操作。