使用 concurrent.futures 个 tensorflow.keras 个模型并行预测
Predicting in parallel using concurrent.futures of tensorflow.keras models
我正在尝试使用 concurrent.futures
实现一些并行作业。每个工作人员都需要一份 TensorFlow 模型和一些数据。我通过以下方式实现它(MWE)
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
])
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
model_clone.set_weights(model.get_weights())
return model_clone
def work(model, seq):
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,100).reshape(num_of_seq, -1)
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
model_list = [clone_model(model) for _ in range(num_of_seq)]
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
Seq_out.append(out)
t1 = time.perf_counter()
print(t1-t0)
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
out = worker(model, num_of_seq=4)
print(out)
simple_model()
创建模型。 clone_model
克隆一个 TensorFlow 模型。 work
表示可能工作的 MWE。 worker
并行分配 work
。
这不起作用,它只是卡住了,没有产生任何输出。但是,如果我将 ProcessPoolExecutor
替换为 ThreadPoolExecutor
,则上述代码有效。但不提供任何加速(可能是它不是 运行 workers 并行)。
根据我的理解,错误在于参数 model
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
.
我修改了代码,使其将模型的路径而不是模型本身发送到子进程。并且有效。
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if len(gpus) > 0:
# print(f'GPUs {gpus}')
# try: tf.config.experimental.set_memory_growth(gpus[0], True)
# except RuntimeError: pass
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
])
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
model_clone.set_weights(model.get_weights())
return model_clone
def work(model_path, seq):
# model = clone_model(model)# model_list[model_id]
# print(model)
# import tensorflow as tf
model = tf.keras.models.load_model(model_path)
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,num_of_seq*10).reshape(num_of_seq, -1)
model_savepath = './simple_model.h5'
model.save(model_savepath)
path_list = [model_savepath for _ in range(num_of_seq)]
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
# model_list = [clone_model(model) for _ in range(num_of_seq)]
index_list = np.arange(1, num_of_seq)# [clone_model(model) for _ in range(num_of_seq)]
# print(model_list)
future_to_samples = {executor.submit(work, path, seq): seq for path, seq in zip(path_list,seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
Seq_out.append(out)
t1 = time.perf_counter()
print(t1-t0)
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
num_of_seq = 400
# model_list = [clone_model(model) for _ in range(4)]
out = worker(model, num_of_seq=num_of_seq)
print(out)
我正在尝试使用 concurrent.futures
实现一些并行作业。每个工作人员都需要一份 TensorFlow 模型和一些数据。我通过以下方式实现它(MWE)
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
])
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
model_clone.set_weights(model.get_weights())
return model_clone
def work(model, seq):
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,100).reshape(num_of_seq, -1)
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
model_list = [clone_model(model) for _ in range(num_of_seq)]
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
Seq_out.append(out)
t1 = time.perf_counter()
print(t1-t0)
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
out = worker(model, num_of_seq=4)
print(out)
simple_model()
创建模型。 clone_model
克隆一个 TensorFlow 模型。 work
表示可能工作的 MWE。 worker
并行分配 work
。
这不起作用,它只是卡住了,没有产生任何输出。但是,如果我将 ProcessPoolExecutor
替换为 ThreadPoolExecutor
,则上述代码有效。但不提供任何加速(可能是它不是 运行 workers 并行)。
根据我的理解,错误在于参数 model
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
.
我修改了代码,使其将模型的路径而不是模型本身发送到子进程。并且有效。
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if len(gpus) > 0:
# print(f'GPUs {gpus}')
# try: tf.config.experimental.set_memory_growth(gpus[0], True)
# except RuntimeError: pass
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
])
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
model_clone.set_weights(model.get_weights())
return model_clone
def work(model_path, seq):
# model = clone_model(model)# model_list[model_id]
# print(model)
# import tensorflow as tf
model = tf.keras.models.load_model(model_path)
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,num_of_seq*10).reshape(num_of_seq, -1)
model_savepath = './simple_model.h5'
model.save(model_savepath)
path_list = [model_savepath for _ in range(num_of_seq)]
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
# model_list = [clone_model(model) for _ in range(num_of_seq)]
index_list = np.arange(1, num_of_seq)# [clone_model(model) for _ in range(num_of_seq)]
# print(model_list)
future_to_samples = {executor.submit(work, path, seq): seq for path, seq in zip(path_list,seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
Seq_out.append(out)
t1 = time.perf_counter()
print(t1-t0)
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
num_of_seq = 400
# model_list = [clone_model(model) for _ in range(4)]
out = worker(model, num_of_seq=num_of_seq)
print(out)