GRPC 导致单个 worker 中的训练暂停(分布式张量流,同步)

GRPC causes training to pause in individual worker (distributed tensorflow, synchronised)

我正在尝试以同步分布式方式训练模型以实现数据并行。我的机器上有 4 个 GPU。每个 gpu 应该 运行 一个工作人员来训练单独的非重叠数据子集(在图形复制之间)。主数据文件分为 16 个较小的 TFRecord 文件。每个工作人员应该处理 4 个不同的文件。问题是训练在每个工作进程中独立地并且在不同的时间冻结。他们在某个时候冻结。

其中一个 'ps' 报告以下与 grpc 相关的错误:

2017-09-21 16:45:55.606842: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2000, 1 -> localhost:2001, 2 -> localhost:2002}
2017-09-21 16:45:55.606877: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2003, 1 -> localhost:2004, 2 -> localhost:2005, 3 -> localhost:2006}
2017-09-21 16:45:55.608066: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2002
E0921 16:48:52.596846076    3037 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=12325, new grpc_chttp2_stream id=12317
2017-09-21 16:48:57.497244: W tensorflow/core/framework/op_kernel.cc:1158] Out of range: End of sequence
     [[Node: data_source_task_index_0/IteratorGetNext = IteratorGetNext[output_shapes=[[-1,-1], [-1,-1], [-1,-1], [-1,-1], [-1,-1]], output_types=[DT_INT64, DT_INT64, DT_INT64, DT_INT64, DT_INT64], _device="/job:ps/replica:0/task:0/cpu:0"](data_source_task_index_0/Iterator)]]
     [[Node: data_source_task_index_0/cond/Merge_2_S341 = _Recv[client_terminated=false, recv_device="/job:ps/replica:0/task:2/cpu:0", send_device="/job:ps/replica:0/task:0/cpu:0", send_device_incarnation=-6450759800525444137, tensor_name="edge_359_data_source_task_index_0/cond/Merge_2", tensor_type=DT_INT64, _device="/job:ps/replica:0/task:2/cpu:0"]()]]
E0921 16:49:58.462749643    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24769
E0921 16:49:58.462780714    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24775, new grpc_chttp2_stream id=24773
E0921 16:49:58.463260203    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24777
E0921 16:49:58.463277333    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24779
E0921 16:49:58.463283953    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24781
E0921 16:49:58.463289625    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24783
E0921 16:49:58.463295275    3036 parsing.c:801]              ignoring out of order new grpc_chttp2_stream request on server; last grpc_chttp2_stream id=24793, new grpc_chttp2_stream id=24785

输入管道

我正在使用 tensorflow 数据集 API 作为输入管道。数据集代码的草图如下所示:

def _dataset(filenames):
    input_files = tf.constant(filenames, dtype=tf.string)
    dataset = tf.contrib.data.TFRecordDataset(filenames)
    dataset = dataset.map(_parse_single_example)
    dataset = dataset.padded_batch(batch_size, padded_shapes=([-1], [-1]))
    iterator = dataset.make_initializable_iterator()
    words, labels = iterator.get_next()
    init_op = iterator.initializer
    return init_op, words, labels

尝试数据分离

首先,我们获取当前 worker/task 的文件列表。

data_files = get_file_name_for_this_worker(task_index)

然后,data_files 被输入数据集。我们想要的效果是没有两个工作人员处理相同的数据集。

确定每个工作人员的数据集范围

    with tf.device(
        tf.train.replica_device_setter(
            worker_device = worker_device,
            ps_device     = ps_device,
            cluster       = cluster,
            ps_strategy = load_balancer)):

            global DATA_SOURCES

            # Setup dataset for each worker (in each process)
            for worker_id in range(num_workers):
                with tf.variable_scope('data_source_task_index_%d' % worker_id):
                    DATA_SOURCES[worker_id] = _dataset(data_files)

            # Select the relevent data source for current task       
            init_op, words, labels = DATA_SOURCES[task_index]

            model = build_model(words, labels)
            ...
            sess, sv, train_op = synchronise(model, p_config, server)
            train(model, sess, train_op, init_op, sv)

训练循环

训练迭代代码是这样的,数据源在完全通过本地数据(每个本地epoch)后被初始化。 OutOfRange 异常表明纪元已完成。

def train(model, sess, train_op, init_op, sv)   
    for epoch in range(FLAGS.num_epochs):
        print("Initialising the data source")
        sess.run(init_op)
        batch = 0
        while True:
            batch += 1
            try:
                if (batch % FLAGS.report_every_batch == 0):
                    on_report_batch(model, train_op, sess)
                else:
                    sess.run(train_op)
            except tf.errors.OutOfRangeError:
                on_epoch_complete(model, sess)
                break
    print("Out of epoch loop")
    if writer:
        writer.close()
        print('Done training, total elapsed time:  %f' % (time.time()-begin_time))

def on_report_batch(model, train_op, sess):
    ...
    _, batch_loss, step = sess.run([train_op, model.batch_loss, model.global_step])
    print("Step: %d," % step, 
          " Epoch: %2d," % (epoch+1), 
          " Batch: %3d," % batch, 
          " Batch Cost: %.4f," % batch_loss,
          " Elapsed Time: %f, " % (elapsed/60.0),
          " Time per batch: %f" % time_per_batch)

问题在以下每晚构建的 tensorflow 中消失了:

http://ci.tensorflow.org/view/Nightly/job/nightly-matrix-linux-gpu/TF_BUILD_IS_OPT=OPT,TF_BUILD_IS_PIP=PIP,TF_BUILD_PYTHON_VERSION=PYTHON3.5,label=gpu-linux/

这里有一些相关评论:

https://github.com/tensorflow/tensorflow/issues/13213