Python 3:多处理 API 调用带有退出条件

Python 3: Multiprocessing API calls with exit condition

我正在尝试编写一个通过数据库条目列表工作的应用程序,使用这些条目进行 API 调用,return 值以及 [=21= 的一个值]s JSON 对 5 个调用的响应是 True,我想要这 5 个调用的列表。由于数据库条目有几千个条目,我想用 multiprocessing 来实现这一点。但我是并行化的初学者,我似乎无法理解它是如何工作的以及如何设置退出条件。这是我得到的:

from multiprocessing.dummy import Pool
import requests

def get_api_response(apikey, result, subscription_id):
    r = requests.get("https://api.example.com/" + subscription_id)
    if r.json()['subscribed'] == True:
        result.append(r.json())
        return result

def pass_args(args):
    foo = get_api_response(*args)
    if foo:
        return foo

def check_response_amount(result):
    if len(result) >= 5:
        pool.terminate() 

# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(5)
pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount)
pool_result.wait()
pool.close()
pool.join()

应用程序检查每个数据库条目和 returns 每个 api 响应,其中 subscribed == True 甚至没有 运行 通过回调。我尝试应用另一个问题 (Python Multiprocessing help exit on condition) 的答案,但无法正常工作。有人可以帮助我吗?

当您使用 map_async 时,直到迭代器中的每个工作项都完成后才会执行回调。如果您希望回调对 request_tuples 中的每个项目执行,而不是仅在所有项目完成后执行,您需要在 for 循环中使用 apply_async 代替:

results = []
for item in request_tuples:
    results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount))

for result in results:
    result.wait()

此外,调用 pool.terminate 不会按照您想要的方式工作;一旦您调用它,您已经提交到池中的项目将永远挂起,这将使您的脚本挂起,因为您在等待它们完成后再退出。您可以通过等待池加入来解决这个问题,而不是实际等待任何单个任务完成。

import time
from multiprocessing.dummy import Pool
from multiprocessing.pool import TERMINATE

def get_api_response(apikey, result, subscription_id):
    url  = ("https://api.example.com/" + str(subscription_id))
    time.sleep(2)
    result.append(url)
    return result

def pass_args(args):
    foo = get_api_response(*args)
    if foo:
        return foo

def check_response_amount(result):
    if result and len(result) >= 5:
        print("DONE %s" % result)
        pool.terminate()


def get_db_entries():
    return [{'subscription_id' : i} for i in range(100)]

# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(2)
results = []
for item in request_tuples:
    results.append(pool.apply_async(get_api_response, item, callback=check_response_amount))
pool.close()
pool.join()
print("done")

输出:

IN HERE
IN HERE
IN HERE
IN HERE
IN HERE
... (a bunch more of this)...
IN HERE
IN HERE
DONE ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5']
done

请注意,result 列表最终可能比您想要的稍大,因为 terminate 调用实际上不会停止正在进行的任务。