Python:用于多处理的 Joblib
Python: Joblib for multiprocessing
所以我有这些给定的功能:
def make_event_df(match_id, path):
'''
Function for making event dataframe.
Argument:
match_id -- int, the required match id for which event data will be constructed.
path -- str, path to .json file containing event data.
Returns:
df -- pandas dataframe, the event dataframe for the particular match.
'''
## read in the json file
event_json = json.load(open(path, encoding='utf-8'))
## normalize the json data
df = json_normalize(event_json, sep='_')
return df
def full_season_events(comp_name, match_df, match_ids, path):
'''
Function to make event dataframe for a full season.
Arguments:
comp_name -- str, competition name + season name
match_df -- pandas dataframe, containing match-data
match_id -- list, list of match id.
path -- str, path to directory where .json file is listed.
e.g. '../input/Statsbomb/data/events'
Returns:
event_df -- pandas dataframe, containing event data for the whole season.
'''
## init an empty dataframe
event_df = pd.DataFrame()
for match_id in tqdm(match_ids, desc=f'Making Event Data For {comp_name}'):
## .json file
temp_path = path + f'/{match_id}.json'
temp_df = make_event_df(match_id, temp_path)
event_df = pd.concat([event_df, temp_df], sort=True)
return event_df
现在我运行这段获取dataframe的代码:
comp_id = 11
season_id = 1
path = f'../input/Statsbomb/data/matches/{comp_id}/{season_id}.json'
match_df = get_matches(comp_id, season_id, path)
comp_name = match_df['competition_name'].unique()[0] + '-' + match_df['season_name'].unique()[0]
match_ids = list(match_df['match_id'].unique())
path = f'../input/Statsbomb/data/events'
event_df = full_season_events(comp_name, match_df, match_ids, path)
上面的代码片段给我这个输出:
Making Event Data For La Liga-2017/2018: 100%|██████████| 36/36 [00:29<00:00, 1.20it/s]
如何使用多处理来加快处理速度,即如何使用 full_season_events()
中的 match_ids
以更快的方式从 JSON 文件中获取数据(使用多处理)。我对 joblib 和多处理概念很陌生。谁能告诉我必须对这些函数进行哪些更改才能获得所需的结果?
这里不需要 joblib
,直接 multiprocessing
即可。
- 我正在使用
imap_unordered
,因为它比 imap
或 map
快,但不保留顺序(每个工作人员都可以乱序接收和提交作业)。不保留顺序似乎无关紧要,因为无论如何您都在 sort=True
ing。
- 因为我正在使用
imap_unordered
,所以需要额外的 jobs
欺骗;没有 istarmap_unordered
可以解压参数,所以我们需要自己做。
- 如果您有很多
match_ids
,可以加快速度,例如chunksize=10
到 imap_unordered
;这意味着每个工作进程将一次被提供 10 个工作,并且他们也会一次 return 10 个工作。它更快,因为在进程同步和序列化上花费的时间更少,但另一方面,TQDM 进度条更新的频率更低。
像往常一样,下面的代码是 dry-coded 并且可能无法运行 OOTB。
import multiprocessing
def make_event_df(job):
# Unpack parameters from job tuple
match_id, path = job
with open(path) as f:
event_json = json.load(f)
# Return the match id (if required) and the result.
return (match_id, json_normalize(event_json, sep="_"))
def full_season_events(comp_name, match_df, match_ids, path):
event_df = pd.DataFrame()
with multiprocessing.Pool() as p:
# Generate job tuples
jobs = [(match_id, path + f"/{match_id}.json") for match_id in match_ids]
# Run & get results from multiprocessing generator
for match_id, temp_df in tqdm(
p.imap_unordered(make_event_df, jobs),
total=len(jobs),
desc=f"Making Event Data For {comp_name}",
):
event_df = pd.concat([event_df, temp_df], sort=True)
return event_df
所以我有这些给定的功能:
def make_event_df(match_id, path):
'''
Function for making event dataframe.
Argument:
match_id -- int, the required match id for which event data will be constructed.
path -- str, path to .json file containing event data.
Returns:
df -- pandas dataframe, the event dataframe for the particular match.
'''
## read in the json file
event_json = json.load(open(path, encoding='utf-8'))
## normalize the json data
df = json_normalize(event_json, sep='_')
return df
def full_season_events(comp_name, match_df, match_ids, path):
'''
Function to make event dataframe for a full season.
Arguments:
comp_name -- str, competition name + season name
match_df -- pandas dataframe, containing match-data
match_id -- list, list of match id.
path -- str, path to directory where .json file is listed.
e.g. '../input/Statsbomb/data/events'
Returns:
event_df -- pandas dataframe, containing event data for the whole season.
'''
## init an empty dataframe
event_df = pd.DataFrame()
for match_id in tqdm(match_ids, desc=f'Making Event Data For {comp_name}'):
## .json file
temp_path = path + f'/{match_id}.json'
temp_df = make_event_df(match_id, temp_path)
event_df = pd.concat([event_df, temp_df], sort=True)
return event_df
现在我运行这段获取dataframe的代码:
comp_id = 11
season_id = 1
path = f'../input/Statsbomb/data/matches/{comp_id}/{season_id}.json'
match_df = get_matches(comp_id, season_id, path)
comp_name = match_df['competition_name'].unique()[0] + '-' + match_df['season_name'].unique()[0]
match_ids = list(match_df['match_id'].unique())
path = f'../input/Statsbomb/data/events'
event_df = full_season_events(comp_name, match_df, match_ids, path)
上面的代码片段给我这个输出:
Making Event Data For La Liga-2017/2018: 100%|██████████| 36/36 [00:29<00:00, 1.20it/s]
如何使用多处理来加快处理速度,即如何使用 full_season_events()
中的 match_ids
以更快的方式从 JSON 文件中获取数据(使用多处理)。我对 joblib 和多处理概念很陌生。谁能告诉我必须对这些函数进行哪些更改才能获得所需的结果?
这里不需要 joblib
,直接 multiprocessing
即可。
- 我正在使用
imap_unordered
,因为它比imap
或map
快,但不保留顺序(每个工作人员都可以乱序接收和提交作业)。不保留顺序似乎无关紧要,因为无论如何您都在sort=True
ing。- 因为我正在使用
imap_unordered
,所以需要额外的jobs
欺骗;没有istarmap_unordered
可以解压参数,所以我们需要自己做。
- 因为我正在使用
- 如果您有很多
match_ids
,可以加快速度,例如chunksize=10
到imap_unordered
;这意味着每个工作进程将一次被提供 10 个工作,并且他们也会一次 return 10 个工作。它更快,因为在进程同步和序列化上花费的时间更少,但另一方面,TQDM 进度条更新的频率更低。
像往常一样,下面的代码是 dry-coded 并且可能无法运行 OOTB。
import multiprocessing
def make_event_df(job):
# Unpack parameters from job tuple
match_id, path = job
with open(path) as f:
event_json = json.load(f)
# Return the match id (if required) and the result.
return (match_id, json_normalize(event_json, sep="_"))
def full_season_events(comp_name, match_df, match_ids, path):
event_df = pd.DataFrame()
with multiprocessing.Pool() as p:
# Generate job tuples
jobs = [(match_id, path + f"/{match_id}.json") for match_id in match_ids]
# Run & get results from multiprocessing generator
for match_id, temp_df in tqdm(
p.imap_unordered(make_event_df, jobs),
total=len(jobs),
desc=f"Making Event Data For {comp_name}",
):
event_df = pd.concat([event_df, temp_df], sort=True)
return event_df