如何为我的张量流模型提高此数据管道的性能

How to improve the performance of this data pipeline for my tensorflow model

我有一个正在 google-colab. The actual model is more complex, but I condensed it into a reproducible example 上训练的张量流模型(删除了 saving/restoring、学习率衰减、断言、tensorboard 事件、梯度裁剪等)。该模型运行合理(收敛到可接受的损失),我正在寻找 加速训练的方法 (每秒迭代次数)。

目前在 colab 的 GPU 上训练 1000 次迭代需要 10 分钟。我当前的批量大小为 512,这意味着模型每秒处理 ~850 个示例(我更喜欢批量大小为 512,除非其他大小提供合理的加速。通过自身更改批量大小不改变速度)。


所以目前我有一个以 tfrecord 格式存储的数据:这里是一个 500Mb example file, the total data-size is ~0.5Tb. This data passes through a reasonably heavy preprocessing step (I can't do preprocessing beforehand as it will increase the size of my tfrecords way above what I can afford). Preprocessing is done via tf.data 并且输出张量((batch_size, 8, 8, 24) 被视为 NHWC,(batch_size, 10))被传递到一个模型。示例 colab 不包含仅用作示例的简化模型。


我尝试了几种加速训练的方法:

数据预处理相关的代码在这里(这里是一个full reproducible example with example data):

_keys_to_map = {
    'd': tf.FixedLenFeature([], tf.string),  # data
    's': tf.FixedLenFeature([], tf.int64),   # score
}


def _parser(record):][3]
    parsed = tf.parse_single_example(record, _keys_to_map)
    return parsed['d'], parsed['s']


def init_tfrecord_dataset():
  files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
  random.shuffle(files_train)

  with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000)             # select elements randomly from the buffer
    ds = ds.map(_parser)                           # map them based on tfrecord format
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.repeat()                               # iterate infinitely 

    return ds.make_initializable_iterator()        # initialize the iterator


def iterator_to_data(iterator):
  """Creates a part of the graph which reads the raw data from an iterator and transforms it to a 
  data ready to be passed to model.

  Args:
    iterator      - iterator. Created by `init_tfrecord_dataset`

  Returns:
    data_board      - (BATCH_SIZE, 8, 8, 24) you can think about as NWHC for images.
    data_flags      - (BATCH_SIZE, 10)
    combined_score  - (BATCH_SIZE,)
  """

  b = tf.constant((128, 64, 32, 16, 8, 4, 2, 1), dtype=tf.uint8, name='unpacked_const')

  with tf.name_scope('tfr_parse'):
    with tf.name_scope('packed_data'):
      next_element = iterator.get_next()
      data_packed, score_int = next_element
      score = tf.cast(score_int, tf.float64, name='score_float')

    # 
    with tf.name_scope('data_unpacked'):
      data_unpacked = tf.reshape(tf.mod(tf.to_int32(tf.decode_raw(data_packed, tf.uint8)[:,:,None] // b), 2), [BATCH_SIZE, 1552], name='data_unpack')

    with tf.name_scope('score'):
      with tf.name_scope('is_mate'):
        score_is_mate = tf.cast(tf.squeeze(tf.slice(data_unpacked, [0, 1546], [BATCH_SIZE, 1])), tf.float64, name='is_mate')
      with tf.name_scope('combined'):
        combined_score = (1 - score_is_mate) * VALUE_A * tf.tanh(score / VALUE_K) + score_is_mate * tf.sign(score) * (VALUE_A + (1 - VALUE_A) / (VALUE_B - 1) * tf.reduce_max(tf.stack([tf.zeros(BATCH_SIZE, dtype=tf.float64), VALUE_B - tf.abs(score)]), axis=0))


    with tf.name_scope('board'):
      with tf.name_scope('reshape_layers'):
        data_board = tf.reshape(tf.slice(data_unpacked, [0, 0], [BATCH_SIZE, 8 * 8 * 24]), [BATCH_SIZE, 8, 8, 24], name='board_reshape')

      with tf.name_scope('combine_layers'):  
        data_board = tf.cast(tf.stack([
          data_board[:,:,:, 0],
          data_board[:,:,:, 4],
          data_board[:,:,:, 8],
          data_board[:,:,:,12],
          data_board[:,:,:,16],
          data_board[:,:,:,20],
          - data_board[:,:,:, 1],
          - data_board[:,:,:, 5],
          - data_board[:,:,:, 9],
          - data_board[:,:,:,13],
          - data_board[:,:,:,17],
          - data_board[:,:,:,21],
          data_board[:,:,:, 2],
          data_board[:,:,:, 6],
          data_board[:,:,:,10],
          data_board[:,:,:,14],
          data_board[:,:,:,18],
          data_board[:,:,:,22],
          - data_board[:,:,:, 3],
          - data_board[:,:,:, 7],
          - data_board[:,:,:,11],
          - data_board[:,:,:,15],
          - data_board[:,:,:,19],
          - data_board[:,:,:,23],
        ], axis=3), tf.float64, name='board_compact')

    with tf.name_scope('flags'):
      data_flags = tf.cast(tf.slice(data_unpacked, [0, 1536], [BATCH_SIZE, 10]), tf.float64, name='flags')

  return data_board, data_flags, combined_score

我正在寻找可以提高训练速度的实用解决方案(我已经尝试了大量的理论想法)(就 examples/second 而言)。我不是寻找提高模型准确性(或修改模型)的方法,因为这只是一个测试模型。

我花了很多时间来优化它(并放弃了)。因此,我很乐意悬赏 200 赏金以奖励具有良好解释的可行解决方案。

我有几个建议:

1)创建批次后,整个批次由iterator_to_data()函数处理。这并不是真正在多个线程上分配任务,至少不是在 api 级别。相反,您可以在 init_tfrecord_dataset() 函数中尝试这样的操作:

ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000)             # select elements randomly from the buffer
ds = ds.map(_parser)  
ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat()

您可能还想更改 iterator_to_data() 函数中的几行,因为输入参数不是具有上述更改的迭代器。

2) 您可能还想使用 tf.train.ProfilerHook 之类的方法获取分析信息。这可以告诉您瓶颈是 cpu 还是 gpu。例如,如果瓶颈与 CPU 有关,您可能会看到 GPU 操作正在等待 memcpyHtoD 操作完成。

to profile your training job is a good one, and may be necessary to understand the actual bottlenecks in your pipeline. The other suggestions in the Input Pipeline performance guide 应该也有用。

但是,还有另一种可能 "quick fix" 可能有用。在某些情况下,Dataset.map() 转换中的工作量可能非常小,主要取决于为每个元素调用函数的成本。在这些情况下,我们经常尝试 向量化 map 函数,并在 Dataset.batch() 转换之后移动它,以减少调用函数的次数(1/512 次) ,在这种情况下),并在每个批次上执行更大且可能更容易并行化的操作。幸运的是,您的管道可以按如下方式矢量化:

def _batch_parser(record_batch):
  # NOTE: Use `tf.parse_example()` to operate on batches of records.
  parsed = tf.parse_example(record_batch, _keys_to_map)
  return parsed['d'], parsed['s']

def init_tfrecord_dataset():
  files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
  random.shuffle(files_train)

  with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000)             # select elements randomly from the buffer
    # NOTE: Change begins here.
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.map(_batch_parser)                     # map batches based on tfrecord format
    # NOTE: Change ends here.
    ds = ds.repeat()                               # iterate infinitely 

    return ds.make_initializable_iterator()        # initialize the iterator

目前,矢量化是一项您必须手动进行的更改,但 tf.data 团队正在致力于 an optimization pass that provides automatic vectorization