函数的 Joblib 并行计算

Joblib parallel computation for function

如何使用作业库并行化此函数?计算发生在 for loop

内部
lotrunnums=
['RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543','6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268']

#function

def get_data_to_dict(data_train, lotrunnums):
    
    start = time.time()
    data=dict()
    for i in lotrunnums:
        trace=data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        data.update(dict({i: np.array(trace)}))
    end = time.time()
    print('{:.4f} s'.format(end-start))
    return data

您必须创建函数来获取 idata_train 以及 returns trace

    def func(data):
        data_train, i = data
        trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        return i, trace.to_list()

然后你可以在 for 循环或 ThreadPoolJoblib 等 运行 中

并且在 运行ning 之后,您可以将所有结果转换为字典。


测试不同方法的最少代码

import pandas as pd
import numpy as np
import time
import random

random.seed(0) # always generate the same values

data_train = pd.DataFrame({
    'LOT_RUNNUM': [
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
        '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268',
    ],
    'SENSOR_VALUE': [random.randint(0,9) for _ in range(8*10)],
})    

lotrunnums = [
    'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543',
    '6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268'
]

def get_data_to_dict_1(data_train, lotrunnums):
    """ using `for`-loop` """

    start = time.time()
    
    data = dict()
    
    for i in lotrunnums:
        trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        #data.update(dict({i: np.array(trace)}))
        #data[i] = np.array(trace)
        data[i] = trace.to_list()
        
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_2(data_train, lotrunnums):
    """ using `isin()`, `groupby()`, `apply()` """

    start = time.time()
    
    trace = data_train[["SENSOR_VALUE", "LOT_RUNNUM"]][data_train["LOT_RUNNUM"].isin(lotrunnums)]

    data = trace.groupby("LOT_RUNNUM")['SENSOR_VALUE'].apply(list).to_dict()
    
    #groups = trace.groupby("LOT_RUNNUM").apply(lambda key, grp: [key, grp["SENSOR_VALUE"].to_list()])
    #print(groups)
    #for key, grp in groups:
    #    print([key, grp["SENSOR_VALUE"].to_list()])
    #data = dict([key, grp["SENSOR_VALUE"].to_list()] for key, grp in groups)
    
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_threadpool(data_train, lotrunnums):
    """ using ThreadPoll """
    
    from concurrent.futures import ThreadPoolExecutor
    
    def func(data):
        data_train, i = data
        trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        return i, trace.to_list()
    
    start = time.time()
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        future = executor.map(func, [(data_train, i) for i in lotrunnums])
        data = dict(future)

    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_joblib(data_train, lotrunnums):
    """ using Joblib with threads """
     
    from joblib import Parallel, delayed
    
    def func(data_train, i):
        #data_train, i = data
        trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        return i, trace.to_list()
    
    start = time.time()
    
    results = Parallel(n_jobs=4, prefer="threads")(delayed(func)(data_train, i) for i in lotrunnums)
    data = dict(results)
    
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_joblib_process(data_train, lotrunnums):
    """ using Joblib with processes """
    
    from joblib import Parallel, delayed
    
    def func(data_train, i):
        #data_train, i = data
        trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        return i, trace.to_list()
    
    start = time.time()
    
    results = Parallel(n_jobs=4)(delayed(func)(data_train, i) for i in lotrunnums)
    data = dict(results)
    
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

# --- main ---

print('--- normal 1 ---')
print(get_data_to_dict_1(data_train, lotrunnums))

print('--- normal 2 ---')
print(get_data_to_dict_2(data_train, lotrunnums))

print('--- threadpool ---')
print(get_data_to_dict_threadpool(data_train, lotrunnums))

print('--- joblib - thread ---')
print(get_data_to_dict_joblib(data_train, lotrunnums))

print('--- joblib - process ---')
print(get_data_to_dict_joblib_process(data_train, lotrunnums))