函数的 Joblib 并行计算
Joblib parallel computation for function
如何使用作业库并行化此函数?计算发生在 for loop
内部
lotrunnums=
['RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543','6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268']
#function
def get_data_to_dict(data_train, lotrunnums):
start = time.time()
data=dict()
for i in lotrunnums:
trace=data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
data.update(dict({i: np.array(trace)}))
end = time.time()
print('{:.4f} s'.format(end-start))
return data
您必须创建函数来获取 i
和 data_train
以及 returns trace
def func(data):
data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
然后你可以在 for
循环或 ThreadPool
、Joblib
等 运行 中
并且在 运行ning 之后,您可以将所有结果转换为字典。
测试不同方法的最少代码
import pandas as pd
import numpy as np
import time
import random
random.seed(0) # always generate the same values
data_train = pd.DataFrame({
'LOT_RUNNUM': [
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
],
'SENSOR_VALUE': [random.randint(0,9) for _ in range(8*10)],
})
lotrunnums = [
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268'
]
def get_data_to_dict_1(data_train, lotrunnums):
""" using `for`-loop` """
start = time.time()
data = dict()
for i in lotrunnums:
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
#data.update(dict({i: np.array(trace)}))
#data[i] = np.array(trace)
data[i] = trace.to_list()
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_2(data_train, lotrunnums):
""" using `isin()`, `groupby()`, `apply()` """
start = time.time()
trace = data_train[["SENSOR_VALUE", "LOT_RUNNUM"]][data_train["LOT_RUNNUM"].isin(lotrunnums)]
data = trace.groupby("LOT_RUNNUM")['SENSOR_VALUE'].apply(list).to_dict()
#groups = trace.groupby("LOT_RUNNUM").apply(lambda key, grp: [key, grp["SENSOR_VALUE"].to_list()])
#print(groups)
#for key, grp in groups:
# print([key, grp["SENSOR_VALUE"].to_list()])
#data = dict([key, grp["SENSOR_VALUE"].to_list()] for key, grp in groups)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_threadpool(data_train, lotrunnums):
""" using ThreadPoll """
from concurrent.futures import ThreadPoolExecutor
def func(data):
data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
future = executor.map(func, [(data_train, i) for i in lotrunnums])
data = dict(future)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_joblib(data_train, lotrunnums):
""" using Joblib with threads """
from joblib import Parallel, delayed
def func(data_train, i):
#data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
start = time.time()
results = Parallel(n_jobs=4, prefer="threads")(delayed(func)(data_train, i) for i in lotrunnums)
data = dict(results)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_joblib_process(data_train, lotrunnums):
""" using Joblib with processes """
from joblib import Parallel, delayed
def func(data_train, i):
#data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
start = time.time()
results = Parallel(n_jobs=4)(delayed(func)(data_train, i) for i in lotrunnums)
data = dict(results)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
# --- main ---
print('--- normal 1 ---')
print(get_data_to_dict_1(data_train, lotrunnums))
print('--- normal 2 ---')
print(get_data_to_dict_2(data_train, lotrunnums))
print('--- threadpool ---')
print(get_data_to_dict_threadpool(data_train, lotrunnums))
print('--- joblib - thread ---')
print(get_data_to_dict_joblib(data_train, lotrunnums))
print('--- joblib - process ---')
print(get_data_to_dict_joblib_process(data_train, lotrunnums))
如何使用作业库并行化此函数?计算发生在 for loop
内部lotrunnums=
['RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543','6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268']
#function
def get_data_to_dict(data_train, lotrunnums):
start = time.time()
data=dict()
for i in lotrunnums:
trace=data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
data.update(dict({i: np.array(trace)}))
end = time.time()
print('{:.4f} s'.format(end-start))
return data
您必须创建函数来获取 i
和 data_train
以及 returns trace
def func(data):
data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
然后你可以在 for
循环或 ThreadPool
、Joblib
等 运行 中
并且在 运行ning 之后,您可以将所有结果转换为字典。
测试不同方法的最少代码
import pandas as pd
import numpy as np
import time
import random
random.seed(0) # always generate the same values
data_train = pd.DataFrame({
'LOT_RUNNUM': [
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
],
'SENSOR_VALUE': [random.randint(0,9) for _ in range(8*10)],
})
lotrunnums = [
'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
'6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268'
]
def get_data_to_dict_1(data_train, lotrunnums):
""" using `for`-loop` """
start = time.time()
data = dict()
for i in lotrunnums:
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
#data.update(dict({i: np.array(trace)}))
#data[i] = np.array(trace)
data[i] = trace.to_list()
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_2(data_train, lotrunnums):
""" using `isin()`, `groupby()`, `apply()` """
start = time.time()
trace = data_train[["SENSOR_VALUE", "LOT_RUNNUM"]][data_train["LOT_RUNNUM"].isin(lotrunnums)]
data = trace.groupby("LOT_RUNNUM")['SENSOR_VALUE'].apply(list).to_dict()
#groups = trace.groupby("LOT_RUNNUM").apply(lambda key, grp: [key, grp["SENSOR_VALUE"].to_list()])
#print(groups)
#for key, grp in groups:
# print([key, grp["SENSOR_VALUE"].to_list()])
#data = dict([key, grp["SENSOR_VALUE"].to_list()] for key, grp in groups)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_threadpool(data_train, lotrunnums):
""" using ThreadPoll """
from concurrent.futures import ThreadPoolExecutor
def func(data):
data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
future = executor.map(func, [(data_train, i) for i in lotrunnums])
data = dict(future)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_joblib(data_train, lotrunnums):
""" using Joblib with threads """
from joblib import Parallel, delayed
def func(data_train, i):
#data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
start = time.time()
results = Parallel(n_jobs=4, prefer="threads")(delayed(func)(data_train, i) for i in lotrunnums)
data = dict(results)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
def get_data_to_dict_joblib_process(data_train, lotrunnums):
""" using Joblib with processes """
from joblib import Parallel, delayed
def func(data_train, i):
#data_train, i = data
trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
return i, trace.to_list()
start = time.time()
results = Parallel(n_jobs=4)(delayed(func)(data_train, i) for i in lotrunnums)
data = dict(results)
end = time.time()
print('{:.4f} s'.format(end-start))
return data
# --- main ---
print('--- normal 1 ---')
print(get_data_to_dict_1(data_train, lotrunnums))
print('--- normal 2 ---')
print(get_data_to_dict_2(data_train, lotrunnums))
print('--- threadpool ---')
print(get_data_to_dict_threadpool(data_train, lotrunnums))
print('--- joblib - thread ---')
print(get_data_to_dict_joblib(data_train, lotrunnums))
print('--- joblib - process ---')
print(get_data_to_dict_joblib_process(data_train, lotrunnums))