如何实时存储concurrent.futures ProcessPoolExecutor HTTP 响应和进程?

How to store concurrent.futures ProcessPoolExecutor HTTP responses and process in real time?

我有一个正在处理的项目,我希望使用 concurrent.futures ProcessPoolExecutor 发送大量 HTTP 请求。虽然下面的代码非常适合获取请求,但我正在努力寻找处理信息的想法。当我收到响应时,我尝试将它插入到 sqlite3 数据库中,但是尝试管理锁和避免使用全局变量变得很棘手。

理想情况下,我想启动 Pool,并在它执行时能够 read/store 数据。这是可能的还是我应该采取不同的路线...

pool = ProcessPoolExecutor(max_workers=60)
results = list(pool.map(http2_get, urls))

def http2_get(url):
    while(True):
        try:
            start_time = millis()
            result = s.get(url,verify=False)
            print(url + " Total took " + str(millis() - start_time) + " ms")
            return result
        except Exception as e:
            print(e,e.__traceback__.tb_lineno)
            pass

如您所见,map 在所有进程完成之前不会 return。我假设你想在主进程中处理数据。

而不是使用 map,提交所有任务和 process them as they finish:

from concurrent.futures import ProcessPoolExecutor, as_completed

pool = ProcessPoolExecutor(max_workers=60)
futures_list = [pool.submit(http2_get, url) for url in urls]

for future in as_completed(futures_list):
    exception = future.exception()
    if exception is not None:
        # Handle exception in http2_get
        pass
    else:
        result = future.result()
        # process result...

请注意,使用 ProcessPoolExecutor 作为上下文管理器更简洁:

with ProcessPoolExecutor(max_workers=60) as pool:
    futures_list = [pool.submit(http2_get, url) for url in urls]
    
    for future in as_completed(futures_list):
        exception = future.exception()
        if exception is not None:
            # Handle exception in htt2_get
            pass
        else:
            result = future.result()
            # process result...