在 python 到 return 值中使用多处理

Using multiprocessing in python to return values

背景

我现在有一些代码看起来像这样。

failed_player_ids: Set[str] = set()
for player_id in player_ids:
    success = player_api.send_results(
        player_id, user=user, send_health_results=True
    )
    if not success:
        failed_player_ids.add(player_id)

此代码运行良好,但问题是每次调用需要 5 秒。有每分钟 2000 次呼叫的速率限制,所以我远远低于最大容量。我想将其并行化以加快速度。这是我第一次在 python 中使用 multiprocessing 库,因此我对应该如何进行感到有点困惑。我可以用语言描述我想做的事情。

在我当前的代码中,我循环遍历 player_id 列表,如果 api 响应成功,我什么也不做,如果失败,我记下玩家 ID。

我不确定如何实现此代码的并行版本。我有一些想法,但我有点困惑。

这是我目前的想法

from multiprocessing import Pool


    
    num_processors_to_use = 5 # This is a number can be increased to get more speed
    
    def send_player_result(player_id_list: List[str]) -> Optional[str]:
        for player_id in player_id_list:
            success = player_api.send_results(player_id, user=user, send_health_results=True)
            if not success:
                return player_id
    # Caller
    with Pool(processes=num_processors_to_use) as pool:
            responses = pool.map(
                func=send_player_result,
                iterable=player_id_list,
            )
            failed_player_ids = Set(responses)

 

任何意见和建议都会有所帮助。

如果你正在使用函数 map,那么 iterable player_id_list 的每个 item 将被传递作为单独的任务来运行 send_player_result。因此,此函数不应再期望传递玩家 ID 列表,而是传递单个玩家 ID。而且,如您现在所知,如果您的任务主要 I/O 绑定,那么多线程是更好的模型。您可以:

from multiprocessing.dummy import Pool
# or
from multiprocessing.pool import ThreadPool

您可能希望大大增加线程数(但不要大于 player_id_list 的大小):

#from multiprocessing import Pool
from multiprocessing.dummy import Pool
from typing import Set

def send_player_result(player_id):
    success = player_api.send_results(player_id, user=user, send_health_results=True)
    return success

# Only required for Windows if you are doing multiprocessing:
if __name__ == '__main__':
    
    pool_size = 5 # This is a number can be increased to get more concurrency
    
    # Caller
    failed_player_ids: Set[str] = set()
    with Pool(pool_size) as pool:
        results = pool.map(func=send_player_result, iterable=player_id_list)
        for idx, success in enumerate(results):
            if not success:
                # failed for argument player_id_list[idx]:
                failed_player_ids.add(player_id_list[idx])