Tensorflow 输入管道,其中多行对应于单个观察?

Tensorflow input pipeline where multiple rows correspond to a single observation?

所以我刚刚开始使用 Tensorflow,并且我正在努力正确理解输入管道。

我正在研究的问题是序列分类。 我正在尝试读取形状为 (100000, 4) 的 CSV 文件。前 3 列是特征,第 4 列是标签。但是 - 数据表示长度为 10 的序列,即第 1-10 行是序列 1,第 11-20 行是序列 2 等。这也意味着每个标签重复 10 次。

所以在这个输入管道的某个时刻,我需要重塑我的特征张量,比如 tf.reshape(features, [batch_size_, rows_per_ob, input_dim ]). 并且只取我的标签张量的第 10 行,如 label[::rows_per_ob]

我应该指出的另一件事是我的实际数据集有数十亿行,所以我必须考虑性能。

我已经将文档和此处其他帖子中的以下代码放在一起,但我不认为我完全理解这一点,因为我看到以下错误:

INFO:tensorflow:Error reported to Coordinator: , Attempting to use uninitialized value input_producer_2/limit_epochs/epochs

似乎出现了超出范围的错误。

我也不知道如何处理这些批次。最初,我想我会重塑它们然后将它们送入 "feed_dict",但后来我读到这真的很糟糕,我应该使用 tf.data.Dataset 对象。但我不确定如何将这些批次输入到数据集中。我也不完全确定在此过程中重塑数据的最佳时间是什么时候?

还有最后一点混淆 - 当您将 Iterator 与 Dataset 对象一起使用时,我看到我们使用了 get_next() 方法。这是否意味着 Dataset 中的每个元素代表一整批数据?这是否意味着如果我们想改变批量大小,我们需要重建整个数据集对象?

我真的很难将所有部分组合在一起。如果有人对我有任何指示,将不胜感激!谢谢!

# import
import tensorflow as tf

# constants
filename = "tensorflow_test_data.csv"
num_rows = 100000
rows_per_ob = 10
batch_size_ = 5
num_epochs_ = 2
num_batches = int(num_rows * num_epochs_ / batch_size_ / rows_per_ob)

# read csv line
def read_from_csv(filename_queue):
    reader = tf.TextLineReader(skip_header_lines=1)
    _, value = reader.read(filename_queue)
    record_defaults = [[0.0], [0.0], [0.0], [0.0]]
    a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
    features = tf.stack([a, b, c])
    return features, d

def input_pipeline(filename=filename, batch_size=batch_size_, num_epochs=num_epochs_):
    filename_queue = tf.train.string_input_producer([filename],
                                                    num_epochs=num_epochs,
                                                    shuffle=False)
    x, y = read_from_csv(filename_queue)
    x_batch, y_batch = tf.train.batch([x, y],
                                      batch_size = batch_size * rows_per_ob,
                                      num_threads=1,
                                      capacity=10000)
    return x_batch, y_batch

###
x, y = input_pipeline(filename, batch_size=batch_size_,
                      num_epochs = num_epochs_)

# I imagine using lists is wrong here - this was more just for me to
# see the output
x_list = []
y_list = []
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for _ in range(num_batches):
        x_batch, y_batch = sess.run([x, y])
        x_list.append(x_batch)
        y_list.append(y_batch)
    coord.request_stop()
    coord.join(threads)

您可以使用 tf.data.Dataset 对象来表达整个管道,这可能会使事情稍微简单一些:

dataset = tf.data.TextLineDataset(filename)

# Skip the header line.
dataset = dataset.skip(1)

# Combine 10 lines into a single observation.   
dataset = dataset.batch(rows_per_ob)

def parse_observation(line_batch):
  record_defaults = [[0.0], [0.0], [0.0], [0.0]]
  a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
  features = tf.stack([a, b, c])
  label = d[-1]  # Take the label from the last row.
  return features, label

# Parse each observation into a `row_per_ob X 2` matrix of features and a
# scalar label.
dataset = dataset.map(parse_observation)

# Batch multiple observations.
dataset = dataset.batch(batch_size)

# Optionally add a prefetch for performance.
dataset = dataset.prefetch(1)

要使用数据集中的值,您可以制作一个 tf.data.Iterator 以获取下一个元素作为一对 tf.Tensor 对象,然后将它们用作模型的输入。

iterator = dataset.make_one_shot_iterator()

features_batch, label_batch = iterator.get_next()

# Use the `features_batch` and `label_batch` tensors as the inputs to
# the model, rather than fetching them and feeding them via the `Session`
# interface.
train_op = build_model(features_batch, label_batch)