TensorFlow/Keras 多线程模型拟合
TensorFlow/Keras multi-threaded model fitting
我正在尝试使用多个线程(和 tensorflow
后端)训练具有不同参数值的多个 keras
模型。我见过几个在多个线程中使用相同模型的示例,但在这种特殊情况下,我 运行 遇到了关于冲突图等的各种错误。这是我希望能够实现的一个简单示例要做:
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import tensorflow as tf
from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential
sess = tf.Session()
def example_model(size):
model = Sequential()
model.add(Dense(size, input_shape=(5,)))
model.add(Dense(1))
model.compile(optimizer='sgd', loss='mse')
return model
if __name__ == '__main__':
K.set_session(sess)
X = np.random.random((10, 5))
y = np.random.random((10, 1))
models = [example_model(i) for i in range(5, 10)]
e = ThreadPoolExecutor(4)
res_list = [e.submit(model.fit, X, y) for model in models]
for res in res_list:
print(res.result())
产生的错误是 ValueError: Tensor("Variable:0", shape=(5, 5), dtype=float32_ref) must be from the same graph as Tensor("Variable_2/read:0", shape=(), dtype=float32).
。我还尝试在线程中初始化模型,这会导致类似的失败。
关于解决此问题的最佳方法有什么想法吗?我完全不依赖于这个确切的结构,但我更愿意能够使用多线程而不是进程,因此所有模型都在相同的 GPU 内存分配中进行训练。
Tensorflow 图不是线程安全的(参见 https://www.tensorflow.org/api_docs/python/tf/Graph),当您创建新的 Tensorflow 会话时,它默认使用默认图。
您可以通过在并行函数中使用新图形创建新会话并在那里构建您的 keras 模型来解决这个问题。
下面是一些在每个可用的 gpu 上并行创建和拟合模型的代码:
import concurrent.futures
import numpy as np
import keras.backend as K
from keras.layers import Dense
from keras.models import Sequential
import tensorflow as tf
from tensorflow.python.client import device_lib
def get_available_gpus():
local_device_protos = device_lib.list_local_devices()
return [x.name for x in local_device_protos if x.device_type == 'GPU']
xdata = np.random.randn(100, 8)
ytrue = np.random.randint(0, 2, 100)
def fit(gpu):
with tf.Session(graph=tf.Graph()) as sess:
K.set_session(sess)
with tf.device(gpu):
model = Sequential()
model.add(Dense(12, input_dim=8, activation='relu'))
model.add(Dense(8, activation='relu'))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam')
model.fit(xdata, ytrue, verbose=0)
return model.evaluate(xdata, ytrue, verbose=0)
gpus = get_available_gpus()
with concurrent.futures.ThreadPoolExecutor(len(gpus)) as executor:
results = [x for x in executor.map(fit, gpus)]
print('results: ', results)
我正在尝试使用多个线程(和 tensorflow
后端)训练具有不同参数值的多个 keras
模型。我见过几个在多个线程中使用相同模型的示例,但在这种特殊情况下,我 运行 遇到了关于冲突图等的各种错误。这是我希望能够实现的一个简单示例要做:
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import tensorflow as tf
from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential
sess = tf.Session()
def example_model(size):
model = Sequential()
model.add(Dense(size, input_shape=(5,)))
model.add(Dense(1))
model.compile(optimizer='sgd', loss='mse')
return model
if __name__ == '__main__':
K.set_session(sess)
X = np.random.random((10, 5))
y = np.random.random((10, 1))
models = [example_model(i) for i in range(5, 10)]
e = ThreadPoolExecutor(4)
res_list = [e.submit(model.fit, X, y) for model in models]
for res in res_list:
print(res.result())
产生的错误是 ValueError: Tensor("Variable:0", shape=(5, 5), dtype=float32_ref) must be from the same graph as Tensor("Variable_2/read:0", shape=(), dtype=float32).
。我还尝试在线程中初始化模型,这会导致类似的失败。
关于解决此问题的最佳方法有什么想法吗?我完全不依赖于这个确切的结构,但我更愿意能够使用多线程而不是进程,因此所有模型都在相同的 GPU 内存分配中进行训练。
Tensorflow 图不是线程安全的(参见 https://www.tensorflow.org/api_docs/python/tf/Graph),当您创建新的 Tensorflow 会话时,它默认使用默认图。
您可以通过在并行函数中使用新图形创建新会话并在那里构建您的 keras 模型来解决这个问题。
下面是一些在每个可用的 gpu 上并行创建和拟合模型的代码:
import concurrent.futures
import numpy as np
import keras.backend as K
from keras.layers import Dense
from keras.models import Sequential
import tensorflow as tf
from tensorflow.python.client import device_lib
def get_available_gpus():
local_device_protos = device_lib.list_local_devices()
return [x.name for x in local_device_protos if x.device_type == 'GPU']
xdata = np.random.randn(100, 8)
ytrue = np.random.randint(0, 2, 100)
def fit(gpu):
with tf.Session(graph=tf.Graph()) as sess:
K.set_session(sess)
with tf.device(gpu):
model = Sequential()
model.add(Dense(12, input_dim=8, activation='relu'))
model.add(Dense(8, activation='relu'))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam')
model.fit(xdata, ytrue, verbose=0)
return model.evaluate(xdata, ytrue, verbose=0)
gpus = get_available_gpus()
with concurrent.futures.ThreadPoolExecutor(len(gpus)) as executor:
results = [x for x in executor.map(fit, gpus)]
print('results: ', results)