Python 3:多处理 API 调用带有退出条件
Python 3: Multiprocessing API calls with exit condition
我正在尝试编写一个通过数据库条目列表工作的应用程序,使用这些条目进行 API 调用,return 值以及 [=21= 的一个值]s JSON 对 5 个调用的响应是 True
,我想要这 5 个调用的列表。由于数据库条目有几千个条目,我想用 multiprocessing
来实现这一点。但我是并行化的初学者,我似乎无法理解它是如何工作的以及如何设置退出条件。这是我得到的:
from multiprocessing.dummy import Pool
import requests
def get_api_response(apikey, result, subscription_id):
r = requests.get("https://api.example.com/" + subscription_id)
if r.json()['subscribed'] == True:
result.append(r.json())
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if len(result) >= 5:
pool.terminate()
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(5)
pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount)
pool_result.wait()
pool.close()
pool.join()
应用程序检查每个数据库条目和 returns 每个 api 响应,其中 subscribed == True
甚至没有 运行 通过回调。我尝试应用另一个问题 (Python Multiprocessing help exit on condition) 的答案,但无法正常工作。有人可以帮助我吗?
当您使用 map_async
时,直到迭代器中的每个工作项都完成后才会执行回调。如果您希望回调对 request_tuples
中的每个项目执行,而不是仅在所有项目完成后执行,您需要在 for 循环中使用 apply_async
代替:
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount))
for result in results:
result.wait()
此外,调用 pool.terminate
不会按照您想要的方式工作;一旦您调用它,您已经提交到池中的项目将永远挂起,这将使您的脚本挂起,因为您在等待它们完成后再退出。您可以通过等待池加入来解决这个问题,而不是实际等待任何单个任务完成。
import time
from multiprocessing.dummy import Pool
from multiprocessing.pool import TERMINATE
def get_api_response(apikey, result, subscription_id):
url = ("https://api.example.com/" + str(subscription_id))
time.sleep(2)
result.append(url)
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if result and len(result) >= 5:
print("DONE %s" % result)
pool.terminate()
def get_db_entries():
return [{'subscription_id' : i} for i in range(100)]
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(2)
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, item, callback=check_response_amount))
pool.close()
pool.join()
print("done")
输出:
IN HERE
IN HERE
IN HERE
IN HERE
IN HERE
... (a bunch more of this)...
IN HERE
IN HERE
DONE ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5']
done
请注意,result
列表最终可能比您想要的稍大,因为 terminate
调用实际上不会停止正在进行的任务。
我正在尝试编写一个通过数据库条目列表工作的应用程序,使用这些条目进行 API 调用,return 值以及 [=21= 的一个值]s JSON 对 5 个调用的响应是 True
,我想要这 5 个调用的列表。由于数据库条目有几千个条目,我想用 multiprocessing
来实现这一点。但我是并行化的初学者,我似乎无法理解它是如何工作的以及如何设置退出条件。这是我得到的:
from multiprocessing.dummy import Pool
import requests
def get_api_response(apikey, result, subscription_id):
r = requests.get("https://api.example.com/" + subscription_id)
if r.json()['subscribed'] == True:
result.append(r.json())
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if len(result) >= 5:
pool.terminate()
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(5)
pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount)
pool_result.wait()
pool.close()
pool.join()
应用程序检查每个数据库条目和 returns 每个 api 响应,其中 subscribed == True
甚至没有 运行 通过回调。我尝试应用另一个问题 (Python Multiprocessing help exit on condition) 的答案,但无法正常工作。有人可以帮助我吗?
当您使用 map_async
时,直到迭代器中的每个工作项都完成后才会执行回调。如果您希望回调对 request_tuples
中的每个项目执行,而不是仅在所有项目完成后执行,您需要在 for 循环中使用 apply_async
代替:
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount))
for result in results:
result.wait()
此外,调用 pool.terminate
不会按照您想要的方式工作;一旦您调用它,您已经提交到池中的项目将永远挂起,这将使您的脚本挂起,因为您在等待它们完成后再退出。您可以通过等待池加入来解决这个问题,而不是实际等待任何单个任务完成。
import time
from multiprocessing.dummy import Pool
from multiprocessing.pool import TERMINATE
def get_api_response(apikey, result, subscription_id):
url = ("https://api.example.com/" + str(subscription_id))
time.sleep(2)
result.append(url)
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if result and len(result) >= 5:
print("DONE %s" % result)
pool.terminate()
def get_db_entries():
return [{'subscription_id' : i} for i in range(100)]
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(2)
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, item, callback=check_response_amount))
pool.close()
pool.join()
print("done")
输出:
IN HERE
IN HERE
IN HERE
IN HERE
IN HERE
... (a bunch more of this)...
IN HERE
IN HERE
DONE ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5']
done
请注意,result
列表最终可能比您想要的稍大,因为 terminate
调用实际上不会停止正在进行的任务。