使用 python 的多处理在 keras 中并行化模型预测
Parallelizing model predictions in keras using multiprocessing for python
我正在尝试使用 python2 中 keras 提供的 model.predict 命令并行执行模型预测。我为 python2 使用 tensorflow 1.14.0。我有 5 个模型 (.h5) 文件,并且希望 parallel.This 中 运行 的预测命令在 python 2.7 中是 运行。我正在使用多处理池将模型文件名与多个进程的预测函数映射,如下所示,
import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
global input
from tensorflow.keras.models import load_model
model=load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)
输入是从另一部分代码得到的图像numpy数组。但是在执行此操作时,我得到以下信息,
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
self.run()
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'
我无法解释此错误消息,我该如何解决?非常感谢任何建议!
更新 2:
感谢所有指点和完整示例@sokato。我执行了@sokato 发布的确切代码,但是我得到了以下错误(我也对我的代码进行了更改并得到了如下所示的相同错误),
Traceback (most recent call last):
File "Whosebug.py", line 47, in <module>
with multiprocessing.Pool() as p:
AttributeError: __exit__
更新3:
感谢所有 support.I 认为 UPDATE2 中的问题是由于使用 python2 而不是 python3。通过在@sokato 的代码中使用 with closing(multiprocessing.Pool()) as p:
而不是 with multiprocessing.Pool() as p:
,我能够解决 UPDATE2 中针对 python2 给出的错误。导入关闭函数如下:from contextlib import closing
使用如下所示不同方法的新问题,
我实际上有多个输入。我不想每次都为每个输入加载模型,而是想预先加载所有模型并将其保存在列表中。我已经这样做了,如下所示,
import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
loaded_models.append(tf.keras.models.load_model(model))
def prediction(input_tuple):
inputs,loaded_models=input_tuple
predops=[]
for model in loaded_models:
predops.append(model.predict(inputs).tolist()[0])
actops=[]
for predop in predops:
actops.append(predop.index(max(predop)))
max_freqq = max(set(actops), key = actops.count)
return max_freqq
#....some pre-processing....#
'''new_all_t is a list which contains tuples and each tuple has inputs from all_t
and the list containing loaded models which will be extracted
in the prediction function.'''
new_all_t=[]
for elem in all_t:
new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))
new_all_t 是一个包含元组的列表,每个元组都有来自 all_t 的输入和包含将在预测 function.However 中提取的加载模型的列表,我得到以下内容现在出错,
Traceback (most recent call last):
File "trial_mult-ips.py", line 240, in <module>
predops=p.map(prediction,new_all_t)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.
这到底说明了什么?我该如何解决这个问题?
更新4:
我包括了行 tf.compat.v1.enable_eager_execution()
和
tf.compat.v1.enable_v2_behavior()
一开始。现在我收到以下错误,
WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Traceback (most recent call last):
File "the_other_end-mp.py", line 216, in <module>
predops=p.map(prediction,modelon)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
raise self._value
ValueError: Resource handles are not convertible to numpy.
我无法解释此错误消息,我该如何解决?非常感谢任何建议!
因此,我不确定您的某些设计选择,但我已根据给定信息进行了最佳尝试。具体来说,我认为您的并行函数中的全局变量和导入语句可能存在一些问题。
您应该使用共享变量而不是全局变量来在进程之间共享输入。如果需要,您可以在多处理文档中阅读有关共享内存的更多信息。
我根据教程生成了模型,因为您的模型不包括在内。
您没有加入或关闭您的池,但使用以下代码我能够让代码成功并行执行。您可以通过调用 pool.close()
或使用下面显示的 "with" 语法来关闭池。请注意, with 语法不适用于 python 2.7.
import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf
mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
def createModels(models):
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=mis),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])
model.compile(optimizer='adam',
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
for mod in models:
model.save(mod)
def prediction(model_name):
model=tf.keras.models.load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val
if __name__ == "__main__":
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
dir = os.listdir(".")
if models[0] not in dir:
createModels(models)
# Shared array input
ub = 100
testShape = x_train[:ub].shape
input_base = multiprocessing.Array(ctypes.c_double,
int(np.prod(testShape)),lock=False)
input = np.ctypeslib.as_array(input_base)
input = input.reshape(testShape)
input[:ub] = x_train[:ub]
# with multiprocessing.Pool() as p: #Use me for python 3
p = multiprocessing.Pool() #Use me for python 2.7
start_time=time.time()
res=p.map(prediction,models)
p.close() #Use me for python 2.7
print('Total time taken: {}'.format(time.time() - start_time))
print(res)
希望对您有所帮助。
我正在尝试使用 python2 中 keras 提供的 model.predict 命令并行执行模型预测。我为 python2 使用 tensorflow 1.14.0。我有 5 个模型 (.h5) 文件,并且希望 parallel.This 中 运行 的预测命令在 python 2.7 中是 运行。我正在使用多处理池将模型文件名与多个进程的预测函数映射,如下所示,
import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
global input
from tensorflow.keras.models import load_model
model=load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)
输入是从另一部分代码得到的图像numpy数组。但是在执行此操作时,我得到以下信息,
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
self.run()
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'
我无法解释此错误消息,我该如何解决?非常感谢任何建议!
更新 2: 感谢所有指点和完整示例@sokato。我执行了@sokato 发布的确切代码,但是我得到了以下错误(我也对我的代码进行了更改并得到了如下所示的相同错误),
Traceback (most recent call last):
File "Whosebug.py", line 47, in <module>
with multiprocessing.Pool() as p:
AttributeError: __exit__
更新3:
感谢所有 support.I 认为 UPDATE2 中的问题是由于使用 python2 而不是 python3。通过在@sokato 的代码中使用 with closing(multiprocessing.Pool()) as p:
而不是 with multiprocessing.Pool() as p:
,我能够解决 UPDATE2 中针对 python2 给出的错误。导入关闭函数如下:from contextlib import closing
使用如下所示不同方法的新问题,
我实际上有多个输入。我不想每次都为每个输入加载模型,而是想预先加载所有模型并将其保存在列表中。我已经这样做了,如下所示,
import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
loaded_models.append(tf.keras.models.load_model(model))
def prediction(input_tuple):
inputs,loaded_models=input_tuple
predops=[]
for model in loaded_models:
predops.append(model.predict(inputs).tolist()[0])
actops=[]
for predop in predops:
actops.append(predop.index(max(predop)))
max_freqq = max(set(actops), key = actops.count)
return max_freqq
#....some pre-processing....#
'''new_all_t is a list which contains tuples and each tuple has inputs from all_t
and the list containing loaded models which will be extracted
in the prediction function.'''
new_all_t=[]
for elem in all_t:
new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))
new_all_t 是一个包含元组的列表,每个元组都有来自 all_t 的输入和包含将在预测 function.However 中提取的加载模型的列表,我得到以下内容现在出错,
Traceback (most recent call last):
File "trial_mult-ips.py", line 240, in <module>
predops=p.map(prediction,new_all_t)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.
这到底说明了什么?我该如何解决这个问题?
更新4:
我包括了行 tf.compat.v1.enable_eager_execution()
和
tf.compat.v1.enable_v2_behavior()
一开始。现在我收到以下错误,
WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Traceback (most recent call last):
File "the_other_end-mp.py", line 216, in <module>
predops=p.map(prediction,modelon)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
raise self._value
ValueError: Resource handles are not convertible to numpy.
我无法解释此错误消息,我该如何解决?非常感谢任何建议!
因此,我不确定您的某些设计选择,但我已根据给定信息进行了最佳尝试。具体来说,我认为您的并行函数中的全局变量和导入语句可能存在一些问题。
您应该使用共享变量而不是全局变量来在进程之间共享输入。如果需要,您可以在多处理文档中阅读有关共享内存的更多信息。
我根据教程生成了模型,因为您的模型不包括在内。
您没有加入或关闭您的池,但使用以下代码我能够让代码成功并行执行。您可以通过调用
pool.close()
或使用下面显示的 "with" 语法来关闭池。请注意, with 语法不适用于 python 2.7.
import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf
mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
def createModels(models):
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=mis),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])
model.compile(optimizer='adam',
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
for mod in models:
model.save(mod)
def prediction(model_name):
model=tf.keras.models.load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val
if __name__ == "__main__":
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
dir = os.listdir(".")
if models[0] not in dir:
createModels(models)
# Shared array input
ub = 100
testShape = x_train[:ub].shape
input_base = multiprocessing.Array(ctypes.c_double,
int(np.prod(testShape)),lock=False)
input = np.ctypeslib.as_array(input_base)
input = input.reshape(testShape)
input[:ub] = x_train[:ub]
# with multiprocessing.Pool() as p: #Use me for python 3
p = multiprocessing.Pool() #Use me for python 2.7
start_time=time.time()
res=p.map(prediction,models)
p.close() #Use me for python 2.7
print('Total time taken: {}'.format(time.time() - start_time))
print(res)
希望对您有所帮助。