使用 concurrent.futures 个 tensorflow.keras 个模型并行预测

Predicting in parallel using concurrent.futures of tensorflow.keras models

我正在尝试使用 concurrent.futures 实现一些并行作业。每个工作人员都需要一份 TensorFlow 模型和一些数据。我通过以下方式实现它(MWE)

import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures 
import time


def simple_model():
    model = keras.models.Sequential([
        keras.layers.Dense(units = 10, input_shape = [1]),
        keras.layers.Dense(units = 1, activation = 'sigmoid')
    ])
    model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
    return model

def clone_model(model):
    model_clone = tf.keras.models.clone_model(model)
    model_clone.set_weights(model.get_weights())
    return model_clone

def work(model, seq):
    return model.predict(seq)

def worker(model, num_of_seq = 4):
    seqences = np.arange(0,100).reshape(num_of_seq, -1)
    with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:        
        t0 = time.perf_counter()
        model_list = [clone_model(model) for _ in range(num_of_seq)]
        future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
    Seq_out = []
    for future in concurrent.futures.as_completed(future_to_samples):
        out = future.result()
        Seq_out.append(out)
    t1 = time.perf_counter()
    print(t1-t0)
    return np.reshape(Seq_out, (-1, )), t1-t0



if __name__ == '__main__':
    model = simple_model()
    out = worker(model, num_of_seq=4)
    print(out)

simple_model() 创建模型。 clone_model 克隆一个 TensorFlow 模型。 work 表示可能工作的 MWE。 worker 并行分配 work

这不起作用,它只是卡住了,没有产生任何输出。但是,如果我将 ProcessPoolExecutor 替换为 ThreadPoolExecutor,则上述代码有效。但不提供任何加速(可能是它不是 运行 workers 并行)。

根据我的理解,错误在于参数 model future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}.

我修改了代码,使其将模型的路径而不是模型本身发送到子进程。并且有效。

import tensorflow as tf
from tensorflow import keras

import numpy as np
import concurrent.futures 
import time

# gpus = tf.config.experimental.list_physical_devices('GPU')
# if len(gpus) > 0:
#     print(f'GPUs {gpus}')
#     try: tf.config.experimental.set_memory_growth(gpus[0], True)
#     except RuntimeError: pass

def simple_model():
    model = keras.models.Sequential([
        keras.layers.Dense(units = 10, input_shape = [1]),
        keras.layers.Dense(units = 1, activation = 'sigmoid')
    ])
    model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
    return model

def clone_model(model):
    model_clone = tf.keras.models.clone_model(model)
    model_clone.set_weights(model.get_weights())
    return model_clone

def work(model_path, seq):
    # model = clone_model(model)# model_list[model_id]
    # print(model)
    # import tensorflow as tf
    model = tf.keras.models.load_model(model_path)
    return model.predict(seq)

def worker(model, num_of_seq = 4):
    seqences = np.arange(0,num_of_seq*10).reshape(num_of_seq, -1)
    model_savepath = './simple_model.h5'
    model.save(model_savepath)
    path_list = [model_savepath for _ in range(num_of_seq)]
    with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:        
        t0 = time.perf_counter()
        # model_list = [clone_model(model) for _ in range(num_of_seq)]
        index_list = np.arange(1, num_of_seq)# [clone_model(model) for _ in range(num_of_seq)]
        # print(model_list)
        future_to_samples = {executor.submit(work, path, seq): seq for path, seq in zip(path_list,seqences)}
    Seq_out = []
    for future in concurrent.futures.as_completed(future_to_samples):
        out = future.result()
        Seq_out.append(out)
    t1 = time.perf_counter()
    print(t1-t0)
    return np.reshape(Seq_out, (-1, )), t1-t0



if __name__ == '__main__':
    model = simple_model()
    num_of_seq = 400
    # model_list = [clone_model(model) for _ in range(4)]
    out = worker(model, num_of_seq=num_of_seq)
    print(out)