TensorFlow 中的异步计算
Asynchronous computation in TensorFlow
最近我一直在研究 TensorFlow,我提到该框架无法使用我所有可用的计算资源。在 Convolutional Neural Networks 教程中,他们提到
Naively employing asynchronous updates of model parameters leads to sub-optimal training performance because an individual model replica might be trained on a stale copy of the model parameters. Conversely, employing fully synchronous updates will be as slow as the slowest model replica.
虽然他们在教程和 whitepaper 中都提到了它,但我并没有真正找到在本地机器上进行异步并行计算的方法。有可能吗?或者它是 TensorFlow 的分布式待发布版本的一部分。如果是,那又如何?
TensorFlow 的开源版本支持异步梯度下降,甚至无需修改图形。最简单的方法是并行执行多个并发步骤:
loss = ...
# Any of the optimizer classes can be used here.
train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
sess = tf.Session()
sess.run(tf.initialize_all_variables())
def train_function():
# TODO: Better termination condition, e.g. using a `max_steps` counter.
while True:
sess.run(train_op)
# Create multiple threads to run `train_function()` in parallel
train_threads = []
for _ in range(NUM_CONCURRENT_STEPS):
train_threads.append(threading.Thread(target=train_function))
# Start the threads, and block on their completion.
for t in train_threads:
t.start()
for t in train_threads:
t.join()
此示例设置 NUM_CONCURRENT_STEPS
对 sess.run(train_op)
的调用。由于这些线程之间没有协调,它们异步进行。
实现同步并行训练(目前)实际上更具挑战性,因为这需要额外的协调以确保所有副本读取相同版本的参数,并且所有他们的更新同时变得可见。 multi-GPU example for CIFAR-10 training 通过使用共享参数在训练图中制作 "tower" 的多个副本来执行同步更新,并在应用更新之前显式地对跨塔的梯度进行平均。
N.B. 此答案中的代码将所有计算放在同一设备上,如果您的机器中有多个 GPU,这将不是最佳选择。如果您想使用所有 GPU,请遵循 multi-GPU CIFAR-10 model 的示例,并创建多个 "towers",并将它们的操作固定到每个 GPU。代码大致如下所示:
train_ops = []
for i in range(NUM_GPUS):
with tf.device("/gpu:%d" % i):
# Define a tower on GPU `i`.
loss = ...
train_ops.append(tf.train.GradientDescentOptimizer(0.01).minimize(loss))
def train_function(train_op):
# TODO: Better termination condition, e.g. using a `max_steps` counter.
while True:
sess.run(train_op)
# Create multiple threads to run `train_function()` in parallel
train_threads = []
for train_op in train_ops:
train_threads.append(threading.Thread(target=train_function, args=(train_op,))
# Start the threads, and block on their completion.
for t in train_threads:
t.start()
for t in train_threads:
t.join()
请注意,您可能会发现使用 "variable scope" 可以方便地在塔之间共享变量。
最近我一直在研究 TensorFlow,我提到该框架无法使用我所有可用的计算资源。在 Convolutional Neural Networks 教程中,他们提到
Naively employing asynchronous updates of model parameters leads to sub-optimal training performance because an individual model replica might be trained on a stale copy of the model parameters. Conversely, employing fully synchronous updates will be as slow as the slowest model replica.
虽然他们在教程和 whitepaper 中都提到了它,但我并没有真正找到在本地机器上进行异步并行计算的方法。有可能吗?或者它是 TensorFlow 的分布式待发布版本的一部分。如果是,那又如何?
TensorFlow 的开源版本支持异步梯度下降,甚至无需修改图形。最简单的方法是并行执行多个并发步骤:
loss = ...
# Any of the optimizer classes can be used here.
train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
sess = tf.Session()
sess.run(tf.initialize_all_variables())
def train_function():
# TODO: Better termination condition, e.g. using a `max_steps` counter.
while True:
sess.run(train_op)
# Create multiple threads to run `train_function()` in parallel
train_threads = []
for _ in range(NUM_CONCURRENT_STEPS):
train_threads.append(threading.Thread(target=train_function))
# Start the threads, and block on their completion.
for t in train_threads:
t.start()
for t in train_threads:
t.join()
此示例设置 NUM_CONCURRENT_STEPS
对 sess.run(train_op)
的调用。由于这些线程之间没有协调,它们异步进行。
实现同步并行训练(目前)实际上更具挑战性,因为这需要额外的协调以确保所有副本读取相同版本的参数,并且所有他们的更新同时变得可见。 multi-GPU example for CIFAR-10 training 通过使用共享参数在训练图中制作 "tower" 的多个副本来执行同步更新,并在应用更新之前显式地对跨塔的梯度进行平均。
N.B. 此答案中的代码将所有计算放在同一设备上,如果您的机器中有多个 GPU,这将不是最佳选择。如果您想使用所有 GPU,请遵循 multi-GPU CIFAR-10 model 的示例,并创建多个 "towers",并将它们的操作固定到每个 GPU。代码大致如下所示:
train_ops = []
for i in range(NUM_GPUS):
with tf.device("/gpu:%d" % i):
# Define a tower on GPU `i`.
loss = ...
train_ops.append(tf.train.GradientDescentOptimizer(0.01).minimize(loss))
def train_function(train_op):
# TODO: Better termination condition, e.g. using a `max_steps` counter.
while True:
sess.run(train_op)
# Create multiple threads to run `train_function()` in parallel
train_threads = []
for train_op in train_ops:
train_threads.append(threading.Thread(target=train_function, args=(train_op,))
# Start the threads, and block on their completion.
for t in train_threads:
t.start()
for t in train_threads:
t.join()
请注意,您可能会发现使用 "variable scope" 可以方便地在塔之间共享变量。