在 TensorFlow 中关闭服务器
Shut down server in TensorFlow
当我们要使用分布式TensorFlow时,我们将使用
创建一个参数服务器
tf.train.Server.join()
但是,除了杀死进程之外,我找不到任何关闭服务器的方法。 join() 的 TensorFlow 文档是
Blocks until the server has shut down.
This method currently blocks forever.
这让我很烦恼,因为我想创建许多服务器用于计算并在一切完成后关闭它们。
是否有可能的解决方案。
谢谢
目前没有关闭 TensorFlow gRPC 服务器的干净方法。它 可能 ,但安全地做到这一点需要对所有运行中的请求和响应缓冲区进行额外的内存管理,这将需要大量额外的管道(的最糟糕的一种:异步共享内存管理...) 直到现在还没有人要求的功能!
在实践中,您应该能够使用相同的 tf.train.Server
对象进行许多不同的计算。如果这对您的用例不起作用,请随时 open an GitHub issue 并告诉我们有关您的用例的更多信息。
您可以通过使用 session.run(dequeue_op)
而不是 server.join()
让参数服务器进程按需终止,并让另一个进程在您希望此进程终止时将某些内容加入该队列。
因此,对于 k
参数服务器分片,您可以创建 k
队列,具有唯一的 shared_name
属性 并尝试从该队列 dequeue
。当您想要关闭服务器时,您可以遍历所有队列并 enqueue
一个令牌到每个队列。这将导致 session.run
解除阻塞,并且 Python 进程将 运行 结束并退出,从而关闭服务器。
下面是一个独立的示例,其中包含 2 个分片,取自:
https://gist.github.com/yaroslavvb/82a5b5302449530ca5ff59df520c369e
(对于多 worker/multi 分片示例,请参阅 https://gist.github.com/yaroslavvb/ea1b1bae0a75c4aae593df7eca72d9ca)
import subprocess
import tensorflow as tf
import time
import sys
flags = tf.flags
flags.DEFINE_string("port1", "12222", "port of worker1")
flags.DEFINE_string("port2", "12223", "port of worker2")
flags.DEFINE_string("task", "", "internal use")
FLAGS = flags.FLAGS
# setup local cluster from flags
host = "127.0.0.1:"
cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]}
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()
if __name__=='__main__':
if not FLAGS.task: # start servers and run client
# launch distributed service
def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT)
runcmd("python %s --task=0"%(sys.argv[0]))
runcmd("python %s --task=1"%(sys.argv[0]))
time.sleep(1)
# bring down distributed service
sess = tf.Session("grpc://"+host+FLAGS.port1)
queue0 = tf.FIFOQueue(1, tf.int32, shared_name="queue0")
queue1 = tf.FIFOQueue(1, tf.int32, shared_name="queue1")
with tf.device("/job:worker/task:0"):
add_op0 = tf.add(tf.ones(()), tf.ones(()))
with tf.device("/job:worker/task:1"):
add_op1 = tf.add(tf.ones(()), tf.ones(()))
print("Running computation on server 0")
print(sess.run(add_op0))
print("Running computation on server 1")
print(sess.run(add_op1))
print("Bringing down server 0")
sess.run(queue0.enqueue(1))
print("Bringing down server 1")
sess.run(queue1.enqueue(1))
else: # Launch TensorFlow server
server = tf.train.Server(clusterspec, config=None,
job_name="worker",
task_index=int(FLAGS.task))
print("Starting server "+FLAGS.task)
sess = tf.Session(server.target)
queue = tf.FIFOQueue(1, tf.int32, shared_name="queue"+FLAGS.task)
sess.run(queue.dequeue())
print("Terminating server"+FLAGS.task)
此页面经常出现在 Google 上,所以我想我会尝试改进 ,为那些刚接触分布式 Tensorflow 的人提供我希望的更明确的答案。
import tensorflow as tf
import threading
def main(job_name, task):
cluster = tf.train.ClusterSpec({
'ps': ['localhost:22222', 'localhost:22223'],
'worker': ['localhost: 22224','localhost: 22225','localhost: 22226']
})
server = tf.train.Server(cluster, job_name=job_name, task_index=task)
if job_name == 'ps':
# create a shared queue on the parameter server which is visible on /job:ps/task:%d
with tf.device('/job:ps/task:%d' % task):
queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % task)
# wait for the queue to be filled
with tf.Session(server.target) as sess:
for i in range(cluster.num_tasks('worker')):
sess.run(queue.dequeue())
print('ps:%d received "done" from worker:%d' % (task, i))
print('ps:%d quitting' % task)
elif job_name == 'worker':
queues = []
# create a shared queue on the worker which is visible on /job:ps/task:%d
for i in range(cluster.num_tasks('ps')):
with tf.device('/job:ps/task:%d' % i):
queues.append(tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % i))
# fill the queue
with tf.Session(server.target) as sess:
for i in range(cluster.num_tasks('ps')):
_, size = sess.run([queues[i].enqueue(task), queues[i].size()])
print('Worker:%d sending "done" to ps:%d [elements=%d]' % (task, i, size))
if __name__ == '__main__':
threads = [
threading.Thread(target=main, args=('ps', 0)),
threading.Thread(target=main, args=('ps', 1)),
threading.Thread(target=main, args=('worker', 0)),
threading.Thread(target=main, args=('worker', 1)),
threading.Thread(target=main, args=('worker', 2))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
在 "canonical" Distributed Tensorflow example 上扩展非常简单,只需用以下代码片段替换代码的工作部分:
# create a worker that does nothing
elif job_name == 'worker':
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d' % task, cluster=cluster)):
global_step = tf.train.get_or_create_global_step()
no_op = tf.no_op()
done_ops = []
# create a shared queue on the worker which is visible on /job:ps/task:%d
for i in range(cluster.num_tasks('ps')):
with tf.device('/job:ps/task:%d' % i):
done_queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue' + str(i))
done_ops.append(done_queue.enqueue(task))
hooks=[tf.train.StopAtStepHook(last_step=1),
tf.train.FinalOpsHook([done_ops])]
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(task == 0),
hooks=hooks) as sess:
sess.run([no_op])
请注意,MonitoredTrainingSession 版本在将所有工作人员连接在一起时似乎要慢得多。
当我们要使用分布式TensorFlow时,我们将使用
创建一个参数服务器tf.train.Server.join()
但是,除了杀死进程之外,我找不到任何关闭服务器的方法。 join() 的 TensorFlow 文档是
Blocks until the server has shut down.
This method currently blocks forever.
这让我很烦恼,因为我想创建许多服务器用于计算并在一切完成后关闭它们。
是否有可能的解决方案。
谢谢
目前没有关闭 TensorFlow gRPC 服务器的干净方法。它 可能
在实践中,您应该能够使用相同的 tf.train.Server
对象进行许多不同的计算。如果这对您的用例不起作用,请随时 open an GitHub issue 并告诉我们有关您的用例的更多信息。
您可以通过使用 session.run(dequeue_op)
而不是 server.join()
让参数服务器进程按需终止,并让另一个进程在您希望此进程终止时将某些内容加入该队列。
因此,对于 k
参数服务器分片,您可以创建 k
队列,具有唯一的 shared_name
属性 并尝试从该队列 dequeue
。当您想要关闭服务器时,您可以遍历所有队列并 enqueue
一个令牌到每个队列。这将导致 session.run
解除阻塞,并且 Python 进程将 运行 结束并退出,从而关闭服务器。
下面是一个独立的示例,其中包含 2 个分片,取自: https://gist.github.com/yaroslavvb/82a5b5302449530ca5ff59df520c369e
(对于多 worker/multi 分片示例,请参阅 https://gist.github.com/yaroslavvb/ea1b1bae0a75c4aae593df7eca72d9ca)
import subprocess
import tensorflow as tf
import time
import sys
flags = tf.flags
flags.DEFINE_string("port1", "12222", "port of worker1")
flags.DEFINE_string("port2", "12223", "port of worker2")
flags.DEFINE_string("task", "", "internal use")
FLAGS = flags.FLAGS
# setup local cluster from flags
host = "127.0.0.1:"
cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]}
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()
if __name__=='__main__':
if not FLAGS.task: # start servers and run client
# launch distributed service
def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT)
runcmd("python %s --task=0"%(sys.argv[0]))
runcmd("python %s --task=1"%(sys.argv[0]))
time.sleep(1)
# bring down distributed service
sess = tf.Session("grpc://"+host+FLAGS.port1)
queue0 = tf.FIFOQueue(1, tf.int32, shared_name="queue0")
queue1 = tf.FIFOQueue(1, tf.int32, shared_name="queue1")
with tf.device("/job:worker/task:0"):
add_op0 = tf.add(tf.ones(()), tf.ones(()))
with tf.device("/job:worker/task:1"):
add_op1 = tf.add(tf.ones(()), tf.ones(()))
print("Running computation on server 0")
print(sess.run(add_op0))
print("Running computation on server 1")
print(sess.run(add_op1))
print("Bringing down server 0")
sess.run(queue0.enqueue(1))
print("Bringing down server 1")
sess.run(queue1.enqueue(1))
else: # Launch TensorFlow server
server = tf.train.Server(clusterspec, config=None,
job_name="worker",
task_index=int(FLAGS.task))
print("Starting server "+FLAGS.task)
sess = tf.Session(server.target)
queue = tf.FIFOQueue(1, tf.int32, shared_name="queue"+FLAGS.task)
sess.run(queue.dequeue())
print("Terminating server"+FLAGS.task)
此页面经常出现在 Google 上,所以我想我会尝试改进
import tensorflow as tf
import threading
def main(job_name, task):
cluster = tf.train.ClusterSpec({
'ps': ['localhost:22222', 'localhost:22223'],
'worker': ['localhost: 22224','localhost: 22225','localhost: 22226']
})
server = tf.train.Server(cluster, job_name=job_name, task_index=task)
if job_name == 'ps':
# create a shared queue on the parameter server which is visible on /job:ps/task:%d
with tf.device('/job:ps/task:%d' % task):
queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % task)
# wait for the queue to be filled
with tf.Session(server.target) as sess:
for i in range(cluster.num_tasks('worker')):
sess.run(queue.dequeue())
print('ps:%d received "done" from worker:%d' % (task, i))
print('ps:%d quitting' % task)
elif job_name == 'worker':
queues = []
# create a shared queue on the worker which is visible on /job:ps/task:%d
for i in range(cluster.num_tasks('ps')):
with tf.device('/job:ps/task:%d' % i):
queues.append(tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % i))
# fill the queue
with tf.Session(server.target) as sess:
for i in range(cluster.num_tasks('ps')):
_, size = sess.run([queues[i].enqueue(task), queues[i].size()])
print('Worker:%d sending "done" to ps:%d [elements=%d]' % (task, i, size))
if __name__ == '__main__':
threads = [
threading.Thread(target=main, args=('ps', 0)),
threading.Thread(target=main, args=('ps', 1)),
threading.Thread(target=main, args=('worker', 0)),
threading.Thread(target=main, args=('worker', 1)),
threading.Thread(target=main, args=('worker', 2))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
在 "canonical" Distributed Tensorflow example 上扩展非常简单,只需用以下代码片段替换代码的工作部分:
# create a worker that does nothing
elif job_name == 'worker':
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d' % task, cluster=cluster)):
global_step = tf.train.get_or_create_global_step()
no_op = tf.no_op()
done_ops = []
# create a shared queue on the worker which is visible on /job:ps/task:%d
for i in range(cluster.num_tasks('ps')):
with tf.device('/job:ps/task:%d' % i):
done_queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue' + str(i))
done_ops.append(done_queue.enqueue(task))
hooks=[tf.train.StopAtStepHook(last_step=1),
tf.train.FinalOpsHook([done_ops])]
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(task == 0),
hooks=hooks) as sess:
sess.run([no_op])
请注意,MonitoredTrainingSession 版本在将所有工作人员连接在一起时似乎要慢得多。