Tensorflow 变量未使用图间复制进行初始化
Tensorflow Variables are Not Initialized using Between-graph Replication
我有如下 Python 代码 test.py
,它使用 "Between-graph Replication" 用于分布式 Tensorflow:
import argparse
import logging
import tensorflow as tf
log = logging.getLogger(__name__)
# Job Names
PARAMETER_SERVER = "ps"
WORKER_SERVER = "worker"
# Cluster Details
CLUSTER_SPEC = {
PARAMETER_SERVER: ["localhost:2222"],
WORKER_SERVER: ["localhost:1111", "localhost:1112"]}
def parse_command_arguments():
""" Set up and parse the command line arguments passed for experiment. """
parser = argparse.ArgumentParser(
description="Parameters and Arguments for the Test.")
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
return parser.parse_args()
def start_server(job_name, task_index):
""" Create a server based on a cluster spec. """
cluster = tf.train.ClusterSpec(CLUSTER_SPEC)
server = tf.train.Server(
cluster, job_name=job_name, task_index=task_index)
return server, cluster
def model():
""" Build up a simple estimator model. """
# Build a linear model and predict values
W = tf.Variable([.3], tf.float32)
b = tf.Variable([-.3], tf.float32)
x = tf.placeholder(tf.float32)
linear_model = W * x + b
y = tf.placeholder(tf.float32)
global_step = tf.get_variable('global_step', [],
initializer=tf.constant_initializer(0),
trainable=False)
# Loss sub-graph
loss = tf.reduce_sum(tf.square(linear_model - y))
# optimizer
optimizer = tf.train.GradientDescentOptimizer(0.01)
train = optimizer.minimize(loss, global_step=global_step)
init_op = tf.global_variables_initializer()
log.info("Variables initialized ...")
return W, b, loss, x, y, train, global_step, init_op
if __name__ == "__main__":
# Initializing logging with level "INFO".
logging.basicConfig(level=logging.INFO)
# Parse arguments from command line.
arguments = parse_command_arguments()
job_name = arguments.job_name
task_index = arguments.task_index
# Start a server.
server, cluster = start_server(job_name, task_index)
if job_name == "ps":
server.join()
else:
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
W, b, loss, x, y, train, global_step, init_op = model()
with tf.train.MonitoredTrainingSession(
master=server.target,
is_chief=(arguments.task_index == 0 and (
arguments.job_name == 'worker'))) as sess:
step = 0
# training data
x_train = [1, 2, 3, 4]
y_train = [0, -1, -2, -3]
while not sess.should_stop() and step < 1000:
_, step = sess.run(
[train, global_step], {x: x_train, y: y_train})
# evaluate training accuracy
curr_W, curr_b, curr_loss = sess.run(
[W, b, loss], {x: x_train, y: y_train})
print("W: %s b: %s loss: %s" % (curr_W, curr_b, curr_loss))
我运行单机3个不同进程的代码(只有CPU的MacPro)遵循以下顺序:
- 参数服务器:
$ python test.py --task_index 0 --job_name ps
- 工人 1:
$ python test.py --task_index 0 --job_name worker
- 工人 2:
$ python test.py --task_index 1 --job_name worker
而且我发现 "Worker 2" 的过程遇到了一个错误:
$ python test.py --task_index 1 --job_name worker
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job worker -> {0 -> localhost:1111, 1 -> localhost:1112}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:211] Started server with target: grpc://localhost:1112
INFO:__main__:Variables initialized ...
I tensorflow/core/distributed_runtime/master_session.cc:993] Start master session 9912c75f2921fe13 with config:
INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op: None, ready: Variables not initialized: Variable, Variable_1, global_step
INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op: None, ready: Variables not initialized: Variable, Variable_1, global_step
"Worker 2" 的进程就在那里被冻结了。该错误显示 "Worker 2" 的 Tensorflow 变量未成功初始化,所以我想知道 MonitoredTrainingSession
在跨 Tensorflow 会话或其他地方协调变量初始化方面是否存在错误,或者我在代码中遗漏了一些东西。
NOTE: The code was running with Tensorflow 0.12
我认为这是 "intended behavior" tf.train.MonitoredTrainingSession
协调协议。在 recent answer 中,我解释了该协议如何适用于长期 运行 训练作业,因此工作人员将在检查变量是否已初始化之间休眠 30 秒。
Worker 1 运行 初始化操作和 Worker 2 检查变量之间存在竞争条件,如果 Worker 2 "wins" 竞争,它会观察到一些变量未初始化,并且它将进入 30 秒睡眠,然后再次检查。
但是你的程序整体的计算量很小,所以在这30秒的时间里,Worker 1就可以完成它的工作而结束了。当 Worker 2 检查变量是否已初始化时,它会创建一个新的 tf.Session
尝试连接到其他任务,但 Worker 1 不再是 运行,因此您将看到一条日志消息像这样(每 10 秒左右重复一次):
I tensorflow/core/distributed_runtime/master.cc:193] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
当训练作业的时间大大超过 30 秒时,这不会成为问题。
一种解决方法是通过设置 "device filter" 来消除工作人员之间的相互依赖关系。由于在典型的图间配置中,各个工作人员不进行通信,因此您可以告诉 TensorFlow 在会话创建时忽略另一个工作人员的缺席,使用 tf. ConfigProto
:
# Each worker only needs to contact the PS task(s) and the local worker task.
config = tf.ConfigProto(device_filters=[
'/job:ps', '/job:worker/task:%d' % arguments.task_index])
with tf.train.MonitoredTrainingSession(
master=server.target,
config=config,
is_chief=(arguments.task_index == 0 and (
arguments.job_name == 'worker'))) as sess:
# ...
我有如下 Python 代码 test.py
,它使用 "Between-graph Replication" 用于分布式 Tensorflow:
import argparse
import logging
import tensorflow as tf
log = logging.getLogger(__name__)
# Job Names
PARAMETER_SERVER = "ps"
WORKER_SERVER = "worker"
# Cluster Details
CLUSTER_SPEC = {
PARAMETER_SERVER: ["localhost:2222"],
WORKER_SERVER: ["localhost:1111", "localhost:1112"]}
def parse_command_arguments():
""" Set up and parse the command line arguments passed for experiment. """
parser = argparse.ArgumentParser(
description="Parameters and Arguments for the Test.")
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
return parser.parse_args()
def start_server(job_name, task_index):
""" Create a server based on a cluster spec. """
cluster = tf.train.ClusterSpec(CLUSTER_SPEC)
server = tf.train.Server(
cluster, job_name=job_name, task_index=task_index)
return server, cluster
def model():
""" Build up a simple estimator model. """
# Build a linear model and predict values
W = tf.Variable([.3], tf.float32)
b = tf.Variable([-.3], tf.float32)
x = tf.placeholder(tf.float32)
linear_model = W * x + b
y = tf.placeholder(tf.float32)
global_step = tf.get_variable('global_step', [],
initializer=tf.constant_initializer(0),
trainable=False)
# Loss sub-graph
loss = tf.reduce_sum(tf.square(linear_model - y))
# optimizer
optimizer = tf.train.GradientDescentOptimizer(0.01)
train = optimizer.minimize(loss, global_step=global_step)
init_op = tf.global_variables_initializer()
log.info("Variables initialized ...")
return W, b, loss, x, y, train, global_step, init_op
if __name__ == "__main__":
# Initializing logging with level "INFO".
logging.basicConfig(level=logging.INFO)
# Parse arguments from command line.
arguments = parse_command_arguments()
job_name = arguments.job_name
task_index = arguments.task_index
# Start a server.
server, cluster = start_server(job_name, task_index)
if job_name == "ps":
server.join()
else:
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
W, b, loss, x, y, train, global_step, init_op = model()
with tf.train.MonitoredTrainingSession(
master=server.target,
is_chief=(arguments.task_index == 0 and (
arguments.job_name == 'worker'))) as sess:
step = 0
# training data
x_train = [1, 2, 3, 4]
y_train = [0, -1, -2, -3]
while not sess.should_stop() and step < 1000:
_, step = sess.run(
[train, global_step], {x: x_train, y: y_train})
# evaluate training accuracy
curr_W, curr_b, curr_loss = sess.run(
[W, b, loss], {x: x_train, y: y_train})
print("W: %s b: %s loss: %s" % (curr_W, curr_b, curr_loss))
我运行单机3个不同进程的代码(只有CPU的MacPro)遵循以下顺序:
- 参数服务器:
$ python test.py --task_index 0 --job_name ps
- 工人 1:
$ python test.py --task_index 0 --job_name worker
- 工人 2:
$ python test.py --task_index 1 --job_name worker
而且我发现 "Worker 2" 的过程遇到了一个错误:
$ python test.py --task_index 1 --job_name worker
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:197] Initialize GrpcChannelCache for job worker -> {0 -> localhost:1111, 1 -> localhost:1112}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:211] Started server with target: grpc://localhost:1112
INFO:__main__:Variables initialized ...
I tensorflow/core/distributed_runtime/master_session.cc:993] Start master session 9912c75f2921fe13 with config:
INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op: None, ready: Variables not initialized: Variable, Variable_1, global_step
INFO:tensorflow:Waiting for model to be ready. Ready_for_local_init_op: None, ready: Variables not initialized: Variable, Variable_1, global_step
"Worker 2" 的进程就在那里被冻结了。该错误显示 "Worker 2" 的 Tensorflow 变量未成功初始化,所以我想知道 MonitoredTrainingSession
在跨 Tensorflow 会话或其他地方协调变量初始化方面是否存在错误,或者我在代码中遗漏了一些东西。
NOTE: The code was running with Tensorflow 0.12
我认为这是 "intended behavior" tf.train.MonitoredTrainingSession
协调协议。在 recent answer 中,我解释了该协议如何适用于长期 运行 训练作业,因此工作人员将在检查变量是否已初始化之间休眠 30 秒。
Worker 1 运行 初始化操作和 Worker 2 检查变量之间存在竞争条件,如果 Worker 2 "wins" 竞争,它会观察到一些变量未初始化,并且它将进入 30 秒睡眠,然后再次检查。
但是你的程序整体的计算量很小,所以在这30秒的时间里,Worker 1就可以完成它的工作而结束了。当 Worker 2 检查变量是否已初始化时,它会创建一个新的 tf.Session
尝试连接到其他任务,但 Worker 1 不再是 运行,因此您将看到一条日志消息像这样(每 10 秒左右重复一次):
I tensorflow/core/distributed_runtime/master.cc:193] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
当训练作业的时间大大超过 30 秒时,这不会成为问题。
一种解决方法是通过设置 "device filter" 来消除工作人员之间的相互依赖关系。由于在典型的图间配置中,各个工作人员不进行通信,因此您可以告诉 TensorFlow 在会话创建时忽略另一个工作人员的缺席,使用 tf. ConfigProto
:
# Each worker only needs to contact the PS task(s) and the local worker task.
config = tf.ConfigProto(device_filters=[
'/job:ps', '/job:worker/task:%d' % arguments.task_index])
with tf.train.MonitoredTrainingSession(
master=server.target,
config=config,
is_chief=(arguments.task_index == 0 and (
arguments.job_name == 'worker'))) as sess:
# ...