在 python 到 return 值中使用多处理
Using multiprocessing in python to return values
背景
我现在有一些代码看起来像这样。
failed_player_ids: Set[str] = set()
for player_id in player_ids:
success = player_api.send_results(
player_id, user=user, send_health_results=True
)
if not success:
failed_player_ids.add(player_id)
此代码运行良好,但问题是每次调用需要 5 秒。有每分钟 2000 次呼叫的速率限制,所以我远远低于最大容量。我想将其并行化以加快速度。这是我第一次在 python 中使用 multiprocessing
库,因此我对应该如何进行感到有点困惑。我可以用语言描述我想做的事情。
在我当前的代码中,我循环遍历 player_id
列表,如果 api 响应成功,我什么也不做,如果失败,我记下玩家 ID。
我不确定如何实现此代码的并行版本。我有一些想法,但我有点困惑。
这是我目前的想法
from multiprocessing import Pool
num_processors_to_use = 5 # This is a number can be increased to get more speed
def send_player_result(player_id_list: List[str]) -> Optional[str]:
for player_id in player_id_list:
success = player_api.send_results(player_id, user=user, send_health_results=True)
if not success:
return player_id
# Caller
with Pool(processes=num_processors_to_use) as pool:
responses = pool.map(
func=send_player_result,
iterable=player_id_list,
)
failed_player_ids = Set(responses)
任何意见和建议都会有所帮助。
如果你正在使用函数 map
,那么 iterable player_id_list
的每个 item 将被传递作为单独的任务来运行 send_player_result
。因此,此函数不应再期望传递玩家 ID 列表,而是传递单个玩家 ID。而且,如您现在所知,如果您的任务主要 I/O 绑定,那么多线程是更好的模型。您可以:
from multiprocessing.dummy import Pool
# or
from multiprocessing.pool import ThreadPool
您可能希望大大增加线程数(但不要大于 player_id_list
的大小):
#from multiprocessing import Pool
from multiprocessing.dummy import Pool
from typing import Set
def send_player_result(player_id):
success = player_api.send_results(player_id, user=user, send_health_results=True)
return success
# Only required for Windows if you are doing multiprocessing:
if __name__ == '__main__':
pool_size = 5 # This is a number can be increased to get more concurrency
# Caller
failed_player_ids: Set[str] = set()
with Pool(pool_size) as pool:
results = pool.map(func=send_player_result, iterable=player_id_list)
for idx, success in enumerate(results):
if not success:
# failed for argument player_id_list[idx]:
failed_player_ids.add(player_id_list[idx])
背景
我现在有一些代码看起来像这样。
failed_player_ids: Set[str] = set()
for player_id in player_ids:
success = player_api.send_results(
player_id, user=user, send_health_results=True
)
if not success:
failed_player_ids.add(player_id)
此代码运行良好,但问题是每次调用需要 5 秒。有每分钟 2000 次呼叫的速率限制,所以我远远低于最大容量。我想将其并行化以加快速度。这是我第一次在 python 中使用 multiprocessing
库,因此我对应该如何进行感到有点困惑。我可以用语言描述我想做的事情。
在我当前的代码中,我循环遍历 player_id
列表,如果 api 响应成功,我什么也不做,如果失败,我记下玩家 ID。
我不确定如何实现此代码的并行版本。我有一些想法,但我有点困惑。
这是我目前的想法
from multiprocessing import Pool
num_processors_to_use = 5 # This is a number can be increased to get more speed
def send_player_result(player_id_list: List[str]) -> Optional[str]:
for player_id in player_id_list:
success = player_api.send_results(player_id, user=user, send_health_results=True)
if not success:
return player_id
# Caller
with Pool(processes=num_processors_to_use) as pool:
responses = pool.map(
func=send_player_result,
iterable=player_id_list,
)
failed_player_ids = Set(responses)
任何意见和建议都会有所帮助。
如果你正在使用函数 map
,那么 iterable player_id_list
的每个 item 将被传递作为单独的任务来运行 send_player_result
。因此,此函数不应再期望传递玩家 ID 列表,而是传递单个玩家 ID。而且,如您现在所知,如果您的任务主要 I/O 绑定,那么多线程是更好的模型。您可以:
from multiprocessing.dummy import Pool
# or
from multiprocessing.pool import ThreadPool
您可能希望大大增加线程数(但不要大于 player_id_list
的大小):
#from multiprocessing import Pool
from multiprocessing.dummy import Pool
from typing import Set
def send_player_result(player_id):
success = player_api.send_results(player_id, user=user, send_health_results=True)
return success
# Only required for Windows if you are doing multiprocessing:
if __name__ == '__main__':
pool_size = 5 # This is a number can be increased to get more concurrency
# Caller
failed_player_ids: Set[str] = set()
with Pool(pool_size) as pool:
results = pool.map(func=send_player_result, iterable=player_id_list)
for idx, success in enumerate(results):
if not success:
# failed for argument player_id_list[idx]:
failed_player_ids.add(player_id_list[idx])