如何实时存储concurrent.futures ProcessPoolExecutor HTTP 响应和进程?
How to store concurrent.futures ProcessPoolExecutor HTTP responses and process in real time?
我有一个正在处理的项目,我希望使用 concurrent.futures ProcessPoolExecutor 发送大量 HTTP 请求。虽然下面的代码非常适合获取请求,但我正在努力寻找处理信息的想法。当我收到响应时,我尝试将它插入到 sqlite3 数据库中,但是尝试管理锁和避免使用全局变量变得很棘手。
理想情况下,我想启动 Pool,并在它执行时能够 read/store 数据。这是可能的还是我应该采取不同的路线...
pool = ProcessPoolExecutor(max_workers=60)
results = list(pool.map(http2_get, urls))
def http2_get(url):
while(True):
try:
start_time = millis()
result = s.get(url,verify=False)
print(url + " Total took " + str(millis() - start_time) + " ms")
return result
except Exception as e:
print(e,e.__traceback__.tb_lineno)
pass
如您所见,map
在所有进程完成之前不会 return。我假设你想在主进程中处理数据。
而不是使用 map
,提交所有任务和 process them as they finish:
from concurrent.futures import ProcessPoolExecutor, as_completed
pool = ProcessPoolExecutor(max_workers=60)
futures_list = [pool.submit(http2_get, url) for url in urls]
for future in as_completed(futures_list):
exception = future.exception()
if exception is not None:
# Handle exception in http2_get
pass
else:
result = future.result()
# process result...
请注意,使用 ProcessPoolExecutor
作为上下文管理器更简洁:
with ProcessPoolExecutor(max_workers=60) as pool:
futures_list = [pool.submit(http2_get, url) for url in urls]
for future in as_completed(futures_list):
exception = future.exception()
if exception is not None:
# Handle exception in htt2_get
pass
else:
result = future.result()
# process result...
我有一个正在处理的项目,我希望使用 concurrent.futures ProcessPoolExecutor 发送大量 HTTP 请求。虽然下面的代码非常适合获取请求,但我正在努力寻找处理信息的想法。当我收到响应时,我尝试将它插入到 sqlite3 数据库中,但是尝试管理锁和避免使用全局变量变得很棘手。
理想情况下,我想启动 Pool,并在它执行时能够 read/store 数据。这是可能的还是我应该采取不同的路线...
pool = ProcessPoolExecutor(max_workers=60)
results = list(pool.map(http2_get, urls))
def http2_get(url):
while(True):
try:
start_time = millis()
result = s.get(url,verify=False)
print(url + " Total took " + str(millis() - start_time) + " ms")
return result
except Exception as e:
print(e,e.__traceback__.tb_lineno)
pass
如您所见,map
在所有进程完成之前不会 return。我假设你想在主进程中处理数据。
而不是使用 map
,提交所有任务和 process them as they finish:
from concurrent.futures import ProcessPoolExecutor, as_completed
pool = ProcessPoolExecutor(max_workers=60)
futures_list = [pool.submit(http2_get, url) for url in urls]
for future in as_completed(futures_list):
exception = future.exception()
if exception is not None:
# Handle exception in http2_get
pass
else:
result = future.result()
# process result...
请注意,使用 ProcessPoolExecutor
作为上下文管理器更简洁:
with ProcessPoolExecutor(max_workers=60) as pool:
futures_list = [pool.submit(http2_get, url) for url in urls]
for future in as_completed(futures_list):
exception = future.exception()
if exception is not None:
# Handle exception in htt2_get
pass
else:
result = future.result()
# process result...