使用多处理并行化 Keras 模型预测
Parallelizing Keras Model Predict Using Multiprocessing
我有一个有 60 个 CPU 的系统。我打算在多个图像上并行化 Keras 模型的预测。我尝试了以下代码:
img_model1 = tensorflow.keras.models.load_model('my_model.h5')
img_model2 = tensorflow.keras.models.load_model('my_model.h5')
img_model3 = tensorflow.keras.models.load_model('my_model.h5')
models=[img_model1,img_model2,img_model3] # all the three are same models
我尝试使用索引来避免 weakref pickling 错误:
def _apply_df(args):
df, model_index = args
preds=prediction_generator(df['image_path'])
return models[model_index].predict(preds)
def apply_by_multiprocessing(df, workers):
workers = workers
pool = Pool(processes=workers)
result = pool.map(_apply_df, [(d,i) for i,d in enumerate(np.array_split(df[['image_path']], workers))])
pool.close()
return pd.concat(list(result))
apply_by_multiprocessing(df=data, workers=3)
代码保持 运行 永远不会产生任何结果...
我想这个问题可以用 tf.Sessions() 解决,但我不确定如何...
在 _apply_df
函数中加载您的模型,因此它不会参与酸洗和发送到进程。
这是一个简单的代码示例,未使用 pandas
在 fashion-mnist 数据上运行模型。我认为您可以根据您的用例调整它。
import tensorflow as tf
import numpy as np
from multiprocessing import Pool
def _apply_df(data):
model = tf.keras.models.load_model("my_fashion_mnist_model.h5")
return model.predict(data)
def apply_by_multiprocessing(data, workers):
pool = Pool(processes=workers)
result = pool.map(_apply_df, np.array_split(data, workers))
pool.close()
return list(result)
def main():
fashion_mnist = tf.keras.datasets.fashion_mnist
_, (test_images, test_labels) = fashion_mnist.load_data()
test_images = test_images / 255.0
results = apply_by_multiprocessing(test_images, workers=3)
print(test_images.shape) # (10000, 28, 28)
print(len(results)) # 3
print([x.shape for x in results]) # [(3334, 10), (3333, 10), (3333, 10)]
if __name__ == "__main__":
main()
我有一个有 60 个 CPU 的系统。我打算在多个图像上并行化 Keras 模型的预测。我尝试了以下代码:
img_model1 = tensorflow.keras.models.load_model('my_model.h5')
img_model2 = tensorflow.keras.models.load_model('my_model.h5')
img_model3 = tensorflow.keras.models.load_model('my_model.h5')
models=[img_model1,img_model2,img_model3] # all the three are same models
我尝试使用索引来避免 weakref pickling 错误:
def _apply_df(args):
df, model_index = args
preds=prediction_generator(df['image_path'])
return models[model_index].predict(preds)
def apply_by_multiprocessing(df, workers):
workers = workers
pool = Pool(processes=workers)
result = pool.map(_apply_df, [(d,i) for i,d in enumerate(np.array_split(df[['image_path']], workers))])
pool.close()
return pd.concat(list(result))
apply_by_multiprocessing(df=data, workers=3)
代码保持 运行 永远不会产生任何结果... 我想这个问题可以用 tf.Sessions() 解决,但我不确定如何...
在 _apply_df
函数中加载您的模型,因此它不会参与酸洗和发送到进程。
这是一个简单的代码示例,未使用 pandas
在 fashion-mnist 数据上运行模型。我认为您可以根据您的用例调整它。
import tensorflow as tf
import numpy as np
from multiprocessing import Pool
def _apply_df(data):
model = tf.keras.models.load_model("my_fashion_mnist_model.h5")
return model.predict(data)
def apply_by_multiprocessing(data, workers):
pool = Pool(processes=workers)
result = pool.map(_apply_df, np.array_split(data, workers))
pool.close()
return list(result)
def main():
fashion_mnist = tf.keras.datasets.fashion_mnist
_, (test_images, test_labels) = fashion_mnist.load_data()
test_images = test_images / 255.0
results = apply_by_multiprocessing(test_images, workers=3)
print(test_images.shape) # (10000, 28, 28)
print(len(results)) # 3
print([x.shape for x in results]) # [(3334, 10), (3333, 10), (3333, 10)]
if __name__ == "__main__":
main()