升级到 tf.dataset 解析 csv 时无法正常工作

Upgrade to tf.dataset not working properly when parsing csv

我有一个 GCMLE 实验,我正在尝试升级我的 input_fn 以使用新的 tf.data 功能。我基于此 sample

创建了以下 input_fn
def input_fn(...):
    dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # shuffle up the list of input files
    dataset = dataset.interleave(lambda filename: # mix together records from cycle_length number of shards
                tf.data.TextLineDataset(filename).skip(1).map(lambda row: parse_csv(row, hparams)), cycle_length=5) 
    if shuffle:
      dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    iterator = dataset.make_one_shot_iterator()
    features = iterator.get_next()

    labels = features.pop(LABEL_COLUMN)

    return features, labels

我的parse_csv和我之前用的是一样的,但是现在不能用了。我可以解决一些问题,但我不完全理解 为什么 我会遇到这些问题。这是我的 parse_csv() 函数

的开始
def parse_csv(..):
    columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
    raw_features = dict(zip(FIELDNAMES, columns))

    words = tf.string_split(raw_features['sentences']) # splitting words
    vocab_table = tf.contrib.lookup.index_table_from_file(vocabulary_file = hparams.vocab_file,
                default_value = 0)

....
  1. 马上这个 tf.string_split() 停止工作并且错误是 ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], []. - 这很容易通过 [=21= 将 raw_features['sentences'] 打包到张量中来解决] 但我不明白为什么 this dataset 方法需要这样做?为什么在旧版本中可以正常工作?为了使形状与我的模型的其余部分相匹配,我最终需要通过 words = tf.squeeze(words, 0) 在最后删除这个额外的维度,因为我将这个 "unecessary" 维度添加到张量中。

  2. 无论出于何种原因,我也收到 table 未初始化的错误 tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized. 但是,此代码完全适用于我的旧 input_fn() (见下文)所以我不知道为什么我现在需要初始化 tables?我还没有想出这部分的解决方案。在我的 parse_csv 函数中我是否缺少能够使用 tf.contrib.lookup.index_table_from_file 的东西?

作为参考,这是我的旧 input_fn() 仍然有效:

def input_fn(...):
    filename_queue = tf.train.string_input_producer(tf.train.match_filenames_once(filenames), 
                num_epochs=num_epochs, shuffle=shuffle, capacity=32)
    reader = tf.TextLineReader(skip_header_lines=skip_header_lines)

    _, rows = reader.read_up_to(filename_queue, num_records=batch_size)

    features = parse_csv(rows, hparams)


        if shuffle:
            features = tf.train.shuffle_batch(
                features,
                batch_size,
                min_after_dequeue=2 * batch_size + 1,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(), 
                enqueue_many=True,
                allow_smaller_final_batch=True
            )
        else:
            features = tf.train.batch(
                features,
                batch_size,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(),
                enqueue_many=True,
                allow_smaller_final_batch=True
            )

labels = features.pop(LABEL_COLUMN)

return features, labels

更新 TF 1.7

我正在使用 TF 1.7(它应该具有@mrry 回答中提到的所有 TF 1.6 功能)重新访问它,但我仍然无法复制该行为。对于我的旧 input_fn(),我可以得到大约 13 steps/sec。我使用的新功能如下:

def input_fn(...):
    files = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
    dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1), cycle_length=num_shards))
    dataset = dataset.apply(tf.contrib.data.map_and_batch(lambda row:
            parse_csv_dataset(row, hparams = hparams), 
            batch_size = batch_size, 
            num_parallel_batches = multiprocessing.cpu_count())) 
    dataset = dataset.prefetch(1)
    if shuffle:
        dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    iterator = dataset.make_initializable_iterator()
    features = iterator.get_next()
    tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)

    labels = {key: features.pop(key) for key in LABEL_COLUMNS}

    return features, labels 

我相信我正在关注所有 performance guildines,例如 1) 使用预取 2) 使用 map_and_batch 和 num_parallel_batches = 核心 3) 使用 parallel_interleave 4 ) 在重复之前应用随机播放。我没有使用的唯一步骤是缓存建议,但希望它真的只对第一个以及 "applying interleave, prefetch and shuffle first." 之后的时期有帮助——但是我发现在 [=87= 之后进行预取和随机播放] 大约有 10% 的加速。

缓冲区问题 我注意到的第一个性能问题是我的旧 input_fn() 花了我大约 13 分钟的时间来完成 20k 步,但即使 buffer_size 为 10,000(我的意思是我们正在等到我们处理了 10,000 个批次)我仍在等待 40 多分钟以使缓冲区变满。花这么长时间有意义吗?如果我知道我在 GCS 上的分片 .csv 已经随机化,是否可以接受 table 将这个 shuffle/buffer 大小变小?我正在尝试从 tf.train.shuffle_batch() 复制行为——但是,似乎在最坏的情况下它应该花费与达到 10k 步相同的 13 分钟才能填满缓冲区?

STEPS/SEC

即使缓冲区填满,全局 steps/sec 在与之前 input_fn() 得到 ~13 steps/sec。

马虎交错 我最终尝试用 sloppy_interleave() 替换 parallel_interleave() 因为这是@mrry 的另一个建议。当我切换到 sloppy_interleave 时,我得到了 14 steps/sec!我知道这意味着它不是确定性的,但这真的应该意味着它不是从一个 运行 (或纪元)到下一个的确定性吗?还是对此有更大的影响?我应该关心旧 shuffle_batch() 方法和 sloppy_interleave 之间的任何真正区别吗?这导致 4-5 倍改进的事实是否表明之前的阻塞因子是什么?

  1. 当您使用 tf.data.TextLineDataset, each element is a scalar string. In this respect, it is more similar to using tf.TextLineReader.read(), rather than the batch version tf.TextLineReader.read_up_to(), which returns a vector of strings. Unfortunately the tf.string_split() op 时需要矢量输入(尽管将来可能会更改),因此目前需要形状操作。

  2. Lookup table 与 tf.data 中的函数的交互略有不同。直觉是你应该在Dataset.map()调用声明一次查找table(这样它将被初始化一次)然后在[=]中捕获它17=] 函数调用 vocab_table.lookup()。像下面这样的东西应该可以工作:

    def input_fn(...):
      dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
    
      # Define `vocab_table` outside the map function and use it in `parse_csv()`.
      vocab_table = tf.contrib.lookup.index_table_from_file(
          vocabulary_file=hparams.vocab_file, default_value=0)
    
      def parse_csv(...):
        columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
        raw_features = dict(zip(FIELDNAMES, columns))
        words = tf.string_split([raw_features['sentences']]) # splitting words
    
        # Use the captured `vocab_table` here.
        word_indices = vocab_table.lookup(words)
    
        # ...    
        features = ...
    
        # NOTE: Structure the output here so that you can simply return
        # the dataset from `input_fn()`.
        labels = features.pop(LABEL_COLUMN)
        return features, labels
    
      # NOTE: Consider using `tf.contrib.data.parallel_interleave()` to perform
      # the reads in parallel.
      dataset = dataset.interleave(
          lambda filename: (tf.data.TextLineDataset(filename)
                            .skip(1)
                            .map(lambda row: parse_csv(row, hparams),
                                 num_parallel_calls=multiprocessing.cpu_count())),
          cycle_length=5) 
    
      if shuffle:
        dataset = dataset.shuffle(buffer_size=10000)
      dataset = dataset.repeat(num_epochs)
      dataset = dataset.batch(batch_size)
    
      # NOTE: Add prefetching here to run the input pipeline in the background.
      dataset = dataset.prefetch(1)
    
      # NOTE: This requires TensorFlow 1.5 or later, but this change simplifies the
      # initialization of the lookup table.
      return dataset
    

在 TF 1.4(目前是与 GCMLE 一起使用的最新版本的 TF)中,您将无法将 make_one_shot_iterator() 与查找表一起使用(参见相关的 ) you will need to use Dataset.make_initializable_iterator() and then initialize iterator.initalizer with your default TABLES_INITIALIZER (from this )。 input_fn() 应该是这样的:

def input_fn(...):
  dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)

  # Define `vocab_table` outside the map function and use it in `parse_csv()`.
  vocab_table = tf.contrib.lookup.index_table_from_file(
      vocabulary_file=hparams.vocab_file, default_value=0)

  dataset = dataset.interleave(
      lambda filename: (tf.data.TextLineDataset(filename)
                        .skip(1)
                        .map(lambda row: parse_csv(row, hparams),
                             num_parallel_calls=multiprocessing.cpu_count())),
      cycle_length=5) 

  if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)
  dataset = dataset.repeat(num_epochs)
  dataset = dataset.batch(batch_size)
  iterator = dataset.make_initializable_iterator()
  features = iterator.get_next()

  # add iterator.intializer to be handled by default table initializers
  tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer) 

  labels = features.pop(LABEL_COLUMN)

  return features, labels