使用多处理并行化 Keras 模型预测

Parallelizing Keras Model Predict Using Multiprocessing

我有一个有 60 个 CPU 的系统。我打算在多个图像上并行化 Keras 模型的预测。我尝试了以下代码:

img_model1 = tensorflow.keras.models.load_model('my_model.h5')
img_model2 = tensorflow.keras.models.load_model('my_model.h5')
img_model3 = tensorflow.keras.models.load_model('my_model.h5')
models=[img_model1,img_model2,img_model3] # all the three are same models

我尝试使用索引来避免 weakref pickling 错误:

def _apply_df(args):
    df, model_index = args
    preds=prediction_generator(df['image_path'])
    return  models[model_index].predict(preds)

def apply_by_multiprocessing(df, workers):
    workers = workers
    pool = Pool(processes=workers)
    result = pool.map(_apply_df, [(d,i) for i,d in enumerate(np.array_split(df[['image_path']], workers))])
    pool.close()
    return pd.concat(list(result))
    

apply_by_multiprocessing(df=data, workers=3) 

代码保持 运行 永远不会产生任何结果... 我想这个问题可以用 tf.Sessions() 解决,但我不确定如何...

_apply_df 函数中加载您的模型,因此它不会参与酸洗和发送到进程。

这是一个简单的代码示例,未使用 pandas 在 fashion-mnist 数据上运行模型。我认为您可以根据您的用例调整它。

import tensorflow as tf
import numpy as np
from multiprocessing import Pool


def _apply_df(data):
    model = tf.keras.models.load_model("my_fashion_mnist_model.h5")
    return model.predict(data)


def apply_by_multiprocessing(data, workers):

    pool = Pool(processes=workers)
    result = pool.map(_apply_df, np.array_split(data, workers))
    pool.close()
    return list(result)


def main():
    fashion_mnist = tf.keras.datasets.fashion_mnist
    _, (test_images, test_labels) = fashion_mnist.load_data()

    test_images = test_images / 255.0
    results = apply_by_multiprocessing(test_images, workers=3)
    print(test_images.shape)           # (10000, 28, 28)
    print(len(results))                # 3
    print([x.shape for x in results])  # [(3334, 10), (3333, 10), (3333, 10)]


if __name__ == "__main__":
    main()