如果 logdir 在 HDFS 中,则分布式 Tensorflow 1.0 Supervisor 会卡住
Distributed Tensorflow 1.0 Supervisor stuck if logdir is in HDFS
我在 centOS 8 上为 CPU 构建了 TF 1.0 二进制文件。如果 Supervisor 的 logdir 在本地磁盘中,我的 MNIST 数据分布式训练代码工作正常。但是如果我把 Supervisor 的 logdir 改成 HDFS,代码就会卡在 Supervisor 的初始化:
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir='hdfs://cdh-2:8020/tmp/example',
global_step=global_step,
init_op=init_op)
我使用了 gdb 并找到了 C 堆栈跟踪。它似乎在 _wrap_RecursivelyCreateDir()
中有问题
#0 0x00007f180cda9f3c in tensorflow::io::internal::SplitPath(tensorflow::StringPiece) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#1 0x00007f180cdaa01a in tensorflow::io::Dirname(tensorflow::StringPiece) () from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#2 0x00007f180cdc0f89 in tensorflow::FileSystem::RecursivelyCreateDir(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#3 0x00007f180cdbe780 in tensorflow::Env::RecursivelyCreateDir(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#4 0x00007f180b5be41a in RecursivelyCreateDir(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, TF_Status*) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#5 0x00007f180b5be538 in _wrap_RecursivelyCreateDir () from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#6 0x00007f1822221213 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#7 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#8 0x00007f182222136a in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#9 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#10 0x00007f18221a07f1 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#11 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#12 0x00007f18221838cf in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#13 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#14 0x00007f18221dd660 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#15 0x00007f18221d41d8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#16 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#17 0x00007f1822220422 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#18 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#19 0x00007f18221a07f1 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#20 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#21 0x00007f18221838cf in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#22 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#23 0x00007f18221dd660 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#24 0x00007f18221d41d8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#25 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
---Type <return> to continue, or q <return> to quit---
#26 0x00007f1822220422 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#27 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#28 0x00007f18221a08f8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#29 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#30 0x00007f18221838cf in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#31 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#32 0x00007f18221dd660 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#33 0x00007f18221d41d8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#34 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#35 0x00007f1822220422 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#36 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#37 0x00007f1822222e42 in PyEval_EvalCode () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#38 0x00007f1822242c60 in PyRun_FileExFlags () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#39 0x00007f1822242e3f in PyRun_SimpleFileExFlags () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#40 0x00007f18222586f4 in Py_Main () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#41 0x000000393ae1ed1d in __libc_start_main () from /lib64/libc.so.6
#42 0x0000000000400649 in _start ()
训练代码如下:
'''
pc-01$ python example.py --job_name="ps" --task_index=0
pc-02$ python example.py --job_name="worker" --task_index=0
pc-03$ python example.py --job_name="worker" --task_index=1
pc-04$ python example.py --job_name="worker" --task_index=2
'''
from __future__ import print_function
import tensorflow as tf
import time
# cluster specification
parameter_servers = ["an-node:2222"]
workers = [ "an-node:2223",
"cdh-2:2222",
"cdh-3:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
# start a server for a specific task
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
# config
batch_size = 100
learning_rate = 0.0005
training_epochs = 20
logs_path = "/tmp/mnist/1"
# load mnist data set
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# count the number of updates
global_step = tf.get_variable('global_step', [],
initializer = tf.constant_initializer(0),
trainable = False)
# input images
with tf.name_scope('input'):
# None -> batch size can be any size, 784 -> flattened mnist image
x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
# target 10 output classes
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
# model parameters will change during training so we use tf.Variable
tf.set_random_seed(1)
with tf.name_scope("weights"):
W1 = tf.Variable(tf.random_normal([784, 100]))
W2 = tf.Variable(tf.random_normal([100, 10]))
# bias
with tf.name_scope("biases"):
b1 = tf.Variable(tf.zeros([100]))
b2 = tf.Variable(tf.zeros([10]))
# implement model
with tf.name_scope("softmax"):
# y is our prediction
z2 = tf.add(tf.matmul(x,W1),b1)
a2 = tf.nn.sigmoid(z2)
z3 = tf.add(tf.matmul(a2,W2),b2)
y = tf.nn.softmax(z3)
# specify cost function
with tf.name_scope('cross_entropy'):
# this is our cost
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
# specify optimizer
with tf.name_scope('train'):
# optimizer is an "operation" which we can execute in a session
grad_op = tf.train.GradientDescentOptimizer(learning_rate)
train_op = grad_op.minimize(cross_entropy, global_step=global_step)
with tf.name_scope('Accuracy'):
# accuracy
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# create a summary for our cost and accuracy
tf.summary.scalar("cost", cross_entropy)
tf.summary.scalar("accuracy", accuracy)
# merge all summaries into a single "operation" which we can execute in a session
summary_op = tf.summary.merge_all()
init_op = tf.initialize_all_variables()
print("Variables initialized ...")
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir='hdfs://cdh-2:8020/user/niara/example',
global_step=global_step,
init_op=init_op)
begin_time = time.time()
frequency = 100
with sv.prepare_or_wait_for_session(server.target) as sess:
# create log writer object (this will log on every machine)
writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph())
# perform training cycles
start_time = time.time()
for epoch in range(training_epochs):
# number of batches in one epoch
batch_count = int(mnist.train.num_examples/batch_size)
count = 0
for i in range(batch_count):
batch_x, batch_y = mnist.train.next_batch(batch_size)
# perform the operations we defined earlier on batch
_, cost, summary, step = sess.run([train_op, cross_entropy, summary_op, global_step],
feed_dict={x: batch_x, y_: batch_y})
writer.add_summary(summary, step)
count += 1
if count % frequency == 0 or i+1 == batch_count:
elapsed_time = time.time() - start_time
start_time = time.time()
print("Step: %d," % (step+1),
" Epoch: %2d," % (epoch+1),
" Batch: %3d of %3d," % (i+1, batch_count),
" Cost: %.4f," % cost,
" AvgTime: %3.2fms" % float(elapsed_time*1000/frequency))
count = 0
print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("Total Time: %3.2fs" % float(time.time() - begin_time))
print("Final Cost: %.4f" % cost)
sv.stop()
print("done”)
我终于解决了这个问题。在How to run TensorFlow on Hadoop页面中,它声明libjvm.so和libhdfs.so的路径都应该在LD_LIBRARY_PATH中。但是在我使用的 hadoop 发行版中,libhdfs.so 不在 $HADOOP_HDFS_HOME/lib/native 文件夹中。在我的集群上找到libhdfs.so的路径并将其附加到LD_LIBRARY_PATH之后,问题就解决了。
我在 centOS 8 上为 CPU 构建了 TF 1.0 二进制文件。如果 Supervisor 的 logdir 在本地磁盘中,我的 MNIST 数据分布式训练代码工作正常。但是如果我把 Supervisor 的 logdir 改成 HDFS,代码就会卡在 Supervisor 的初始化:
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir='hdfs://cdh-2:8020/tmp/example',
global_step=global_step,
init_op=init_op)
我使用了 gdb 并找到了 C 堆栈跟踪。它似乎在 _wrap_RecursivelyCreateDir()
中有问题#0 0x00007f180cda9f3c in tensorflow::io::internal::SplitPath(tensorflow::StringPiece) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#1 0x00007f180cdaa01a in tensorflow::io::Dirname(tensorflow::StringPiece) () from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#2 0x00007f180cdc0f89 in tensorflow::FileSystem::RecursivelyCreateDir(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#3 0x00007f180cdbe780 in tensorflow::Env::RecursivelyCreateDir(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#4 0x00007f180b5be41a in RecursivelyCreateDir(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, TF_Status*) ()
from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#5 0x00007f180b5be538 in _wrap_RecursivelyCreateDir () from /opt/niara/analyzer/lib/python2.7/site-packages/tensorflow/python/_pywrap_tensorflow.so
#6 0x00007f1822221213 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#7 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#8 0x00007f182222136a in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#9 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#10 0x00007f18221a07f1 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#11 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#12 0x00007f18221838cf in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#13 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#14 0x00007f18221dd660 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#15 0x00007f18221d41d8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#16 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#17 0x00007f1822220422 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#18 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#19 0x00007f18221a07f1 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#20 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#21 0x00007f18221838cf in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#22 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#23 0x00007f18221dd660 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#24 0x00007f18221d41d8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#25 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
---Type <return> to continue, or q <return> to quit---
#26 0x00007f1822220422 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#27 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#28 0x00007f18221a08f8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#29 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#30 0x00007f18221838cf in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#31 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#32 0x00007f18221dd660 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#33 0x00007f18221d41d8 in ?? () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#34 0x00007f1822171293 in PyObject_Call () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#35 0x00007f1822220422 in PyEval_EvalFrameEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#36 0x00007f1822222d2e in PyEval_EvalCodeEx () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#37 0x00007f1822222e42 in PyEval_EvalCode () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#38 0x00007f1822242c60 in PyRun_FileExFlags () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#39 0x00007f1822242e3f in PyRun_SimpleFileExFlags () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#40 0x00007f18222586f4 in Py_Main () from /opt/niara/analyzer/lib/libpython2.7.so.1.0
#41 0x000000393ae1ed1d in __libc_start_main () from /lib64/libc.so.6
#42 0x0000000000400649 in _start ()
训练代码如下: ''' pc-01$ python example.py --job_name="ps" --task_index=0 pc-02$ python example.py --job_name="worker" --task_index=0 pc-03$ python example.py --job_name="worker" --task_index=1 pc-04$ python example.py --job_name="worker" --task_index=2 '''
from __future__ import print_function
import tensorflow as tf
import time
# cluster specification
parameter_servers = ["an-node:2222"]
workers = [ "an-node:2223",
"cdh-2:2222",
"cdh-3:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
# start a server for a specific task
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
# config
batch_size = 100
learning_rate = 0.0005
training_epochs = 20
logs_path = "/tmp/mnist/1"
# load mnist data set
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# count the number of updates
global_step = tf.get_variable('global_step', [],
initializer = tf.constant_initializer(0),
trainable = False)
# input images
with tf.name_scope('input'):
# None -> batch size can be any size, 784 -> flattened mnist image
x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
# target 10 output classes
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
# model parameters will change during training so we use tf.Variable
tf.set_random_seed(1)
with tf.name_scope("weights"):
W1 = tf.Variable(tf.random_normal([784, 100]))
W2 = tf.Variable(tf.random_normal([100, 10]))
# bias
with tf.name_scope("biases"):
b1 = tf.Variable(tf.zeros([100]))
b2 = tf.Variable(tf.zeros([10]))
# implement model
with tf.name_scope("softmax"):
# y is our prediction
z2 = tf.add(tf.matmul(x,W1),b1)
a2 = tf.nn.sigmoid(z2)
z3 = tf.add(tf.matmul(a2,W2),b2)
y = tf.nn.softmax(z3)
# specify cost function
with tf.name_scope('cross_entropy'):
# this is our cost
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
# specify optimizer
with tf.name_scope('train'):
# optimizer is an "operation" which we can execute in a session
grad_op = tf.train.GradientDescentOptimizer(learning_rate)
train_op = grad_op.minimize(cross_entropy, global_step=global_step)
with tf.name_scope('Accuracy'):
# accuracy
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# create a summary for our cost and accuracy
tf.summary.scalar("cost", cross_entropy)
tf.summary.scalar("accuracy", accuracy)
# merge all summaries into a single "operation" which we can execute in a session
summary_op = tf.summary.merge_all()
init_op = tf.initialize_all_variables()
print("Variables initialized ...")
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir='hdfs://cdh-2:8020/user/niara/example',
global_step=global_step,
init_op=init_op)
begin_time = time.time()
frequency = 100
with sv.prepare_or_wait_for_session(server.target) as sess:
# create log writer object (this will log on every machine)
writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph())
# perform training cycles
start_time = time.time()
for epoch in range(training_epochs):
# number of batches in one epoch
batch_count = int(mnist.train.num_examples/batch_size)
count = 0
for i in range(batch_count):
batch_x, batch_y = mnist.train.next_batch(batch_size)
# perform the operations we defined earlier on batch
_, cost, summary, step = sess.run([train_op, cross_entropy, summary_op, global_step],
feed_dict={x: batch_x, y_: batch_y})
writer.add_summary(summary, step)
count += 1
if count % frequency == 0 or i+1 == batch_count:
elapsed_time = time.time() - start_time
start_time = time.time()
print("Step: %d," % (step+1),
" Epoch: %2d," % (epoch+1),
" Batch: %3d of %3d," % (i+1, batch_count),
" Cost: %.4f," % cost,
" AvgTime: %3.2fms" % float(elapsed_time*1000/frequency))
count = 0
print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("Total Time: %3.2fs" % float(time.time() - begin_time))
print("Final Cost: %.4f" % cost)
sv.stop()
print("done”)
我终于解决了这个问题。在How to run TensorFlow on Hadoop页面中,它声明libjvm.so和libhdfs.so的路径都应该在LD_LIBRARY_PATH中。但是在我使用的 hadoop 发行版中,libhdfs.so 不在 $HADOOP_HDFS_HOME/lib/native 文件夹中。在我的集群上找到libhdfs.so的路径并将其附加到LD_LIBRARY_PATH之后,问题就解决了。