Python:用于多处理的 Joblib

Python: Joblib for multiprocessing

所以我有这些给定的功能:

def make_event_df(match_id, path):
    '''
    Function for making event dataframe.
    
    Argument:
        match_id -- int, the required match id for which event data will be constructed.
        path -- str, path to .json file containing event data.
    
    Returns:
        df -- pandas dataframe, the event dataframe for the particular match.
    '''
    ## read in the json file
    event_json = json.load(open(path, encoding='utf-8'))
    
    ## normalize the json data
    df = json_normalize(event_json, sep='_')
    
    return df

def full_season_events(comp_name, match_df, match_ids, path):
    '''
    Function to make event dataframe for a full season.
    
    Arguments:
        comp_name -- str, competition name + season name
        match_df -- pandas dataframe, containing match-data
        match_id -- list, list of match id.
        path -- str, path to directory where .json file is listed.
                e.g. '../input/Statsbomb/data/events'
    
    Returns:
        event_df -- pandas dataframe, containing event data for the whole season.
    '''
    ## init an empty dataframe
    event_df = pd.DataFrame()

    for match_id in tqdm(match_ids, desc=f'Making Event Data For {comp_name}'):
        ## .json file
        temp_path = path + f'/{match_id}.json'

        temp_df = make_event_df(match_id, temp_path)
        event_df = pd.concat([event_df, temp_df], sort=True)
        
    return event_df   

现在我运行这段获取dataframe的代码:

comp_id = 11
season_id = 1
path = f'../input/Statsbomb/data/matches/{comp_id}/{season_id}.json'

match_df = get_matches(comp_id, season_id, path)

comp_name = match_df['competition_name'].unique()[0] + '-' + match_df['season_name'].unique()[0]
match_ids = list(match_df['match_id'].unique())
path = f'../input/Statsbomb/data/events'

event_df = full_season_events(comp_name, match_df, match_ids, path)

上面的代码片段给我这个输出:

Making Event Data For La Liga-2017/2018: 100%|██████████| 36/36 [00:29<00:00,  1.20it/s]

如何使用多处理来加快处理速度,即如何使用 full_season_events() 中的 match_ids 以更快的方式从 JSON 文件中获取数据(使用多处理)。我对 joblib 和多处理概念很陌生。谁能告诉我必须对这些函数进行哪些更改才能获得所需的结果?

这里不需要 joblib,直接 multiprocessing 即可。

  • 我正在使用 imap_unordered,因为它比 imapmap 快,但不保留顺序(每个工作人员都可以乱序接收和提交作业)。不保留顺序似乎无关紧要,因为无论如何您都在 sort=Trueing。
    • 因为我正在使用 imap_unordered,所以需要额外的 jobs 欺骗;没有 istarmap_unordered 可以解压参数,所以我们需要自己做。
  • 如果您有很多 match_ids,可以加快速度,例如chunksize=10imap_unordered;这意味着每个工作进程将一次被提供 10 个工作,并且他们也会一次 return 10 个工作。它更快,因为在进程同步和序列化上花费的时间更少,但另一方面,TQDM 进度条更新的频率更低。

像往常一样,下面的代码是 dry-coded 并且可能无法运行 OOTB。

import multiprocessing


def make_event_df(job):
    # Unpack parameters from job tuple
    match_id, path = job
    with open(path) as f:
        event_json = json.load(f)
    # Return the match id (if required) and the result.
    return (match_id, json_normalize(event_json, sep="_"))


def full_season_events(comp_name, match_df, match_ids, path):
    event_df = pd.DataFrame()

    with multiprocessing.Pool() as p:
        # Generate job tuples
        jobs = [(match_id, path + f"/{match_id}.json") for match_id in match_ids]
        # Run & get results from multiprocessing generator
        for match_id, temp_df in tqdm(
            p.imap_unordered(make_event_df, jobs),
            total=len(jobs),
            desc=f"Making Event Data For {comp_name}",
        ):
            event_df = pd.concat([event_df, temp_df], sort=True)

    return event_df