Tensorflow 输入管道,其中多行对应于单个观察?
Tensorflow input pipeline where multiple rows correspond to a single observation?
所以我刚刚开始使用 Tensorflow,并且我正在努力正确理解输入管道。
我正在研究的问题是序列分类。
我正在尝试读取形状为 (100000, 4) 的 CSV 文件。前 3 列是特征,第 4 列是标签。但是 - 数据表示长度为 10 的序列,即第 1-10 行是序列 1,第 11-20 行是序列 2 等。这也意味着每个标签重复 10 次。
所以在这个输入管道的某个时刻,我需要重塑我的特征张量,比如 tf.reshape(features, [batch_size_, rows_per_ob, input_dim ]).
并且只取我的标签张量的第 10 行,如 label[::rows_per_ob]
我应该指出的另一件事是我的实际数据集有数十亿行,所以我必须考虑性能。
我已经将文档和此处其他帖子中的以下代码放在一起,但我不认为我完全理解这一点,因为我看到以下错误:
INFO:tensorflow:Error reported to Coordinator: , Attempting to use uninitialized value input_producer_2/limit_epochs/epochs
似乎出现了超出范围的错误。
我也不知道如何处理这些批次。最初,我想我会重塑它们然后将它们送入 "feed_dict",但后来我读到这真的很糟糕,我应该使用 tf.data.Dataset 对象。但我不确定如何将这些批次输入到数据集中。我也不完全确定在此过程中重塑数据的最佳时间是什么时候?
还有最后一点混淆 - 当您将 Iterator 与 Dataset 对象一起使用时,我看到我们使用了 get_next() 方法。这是否意味着 Dataset 中的每个元素代表一整批数据?这是否意味着如果我们想改变批量大小,我们需要重建整个数据集对象?
我真的很难将所有部分组合在一起。如果有人对我有任何指示,将不胜感激!谢谢!
# import
import tensorflow as tf
# constants
filename = "tensorflow_test_data.csv"
num_rows = 100000
rows_per_ob = 10
batch_size_ = 5
num_epochs_ = 2
num_batches = int(num_rows * num_epochs_ / batch_size_ / rows_per_ob)
# read csv line
def read_from_csv(filename_queue):
reader = tf.TextLineReader(skip_header_lines=1)
_, value = reader.read(filename_queue)
record_defaults = [[0.0], [0.0], [0.0], [0.0]]
a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([a, b, c])
return features, d
def input_pipeline(filename=filename, batch_size=batch_size_, num_epochs=num_epochs_):
filename_queue = tf.train.string_input_producer([filename],
num_epochs=num_epochs,
shuffle=False)
x, y = read_from_csv(filename_queue)
x_batch, y_batch = tf.train.batch([x, y],
batch_size = batch_size * rows_per_ob,
num_threads=1,
capacity=10000)
return x_batch, y_batch
###
x, y = input_pipeline(filename, batch_size=batch_size_,
num_epochs = num_epochs_)
# I imagine using lists is wrong here - this was more just for me to
# see the output
x_list = []
y_list = []
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for _ in range(num_batches):
x_batch, y_batch = sess.run([x, y])
x_list.append(x_batch)
y_list.append(y_batch)
coord.request_stop()
coord.join(threads)
您可以使用 tf.data.Dataset
对象来表达整个管道,这可能会使事情稍微简单一些:
dataset = tf.data.TextLineDataset(filename)
# Skip the header line.
dataset = dataset.skip(1)
# Combine 10 lines into a single observation.
dataset = dataset.batch(rows_per_ob)
def parse_observation(line_batch):
record_defaults = [[0.0], [0.0], [0.0], [0.0]]
a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([a, b, c])
label = d[-1] # Take the label from the last row.
return features, label
# Parse each observation into a `row_per_ob X 2` matrix of features and a
# scalar label.
dataset = dataset.map(parse_observation)
# Batch multiple observations.
dataset = dataset.batch(batch_size)
# Optionally add a prefetch for performance.
dataset = dataset.prefetch(1)
要使用数据集中的值,您可以制作一个 tf.data.Iterator
以获取下一个元素作为一对 tf.Tensor
对象,然后将它们用作模型的输入。
iterator = dataset.make_one_shot_iterator()
features_batch, label_batch = iterator.get_next()
# Use the `features_batch` and `label_batch` tensors as the inputs to
# the model, rather than fetching them and feeding them via the `Session`
# interface.
train_op = build_model(features_batch, label_batch)
所以我刚刚开始使用 Tensorflow,并且我正在努力正确理解输入管道。
我正在研究的问题是序列分类。 我正在尝试读取形状为 (100000, 4) 的 CSV 文件。前 3 列是特征,第 4 列是标签。但是 - 数据表示长度为 10 的序列,即第 1-10 行是序列 1,第 11-20 行是序列 2 等。这也意味着每个标签重复 10 次。
所以在这个输入管道的某个时刻,我需要重塑我的特征张量,比如 tf.reshape(features, [batch_size_, rows_per_ob, input_dim ]). 并且只取我的标签张量的第 10 行,如 label[::rows_per_ob]
我应该指出的另一件事是我的实际数据集有数十亿行,所以我必须考虑性能。
我已经将文档和此处其他帖子中的以下代码放在一起,但我不认为我完全理解这一点,因为我看到以下错误:
INFO:tensorflow:Error reported to Coordinator: , Attempting to use uninitialized value input_producer_2/limit_epochs/epochs
似乎出现了超出范围的错误。
我也不知道如何处理这些批次。最初,我想我会重塑它们然后将它们送入 "feed_dict",但后来我读到这真的很糟糕,我应该使用 tf.data.Dataset 对象。但我不确定如何将这些批次输入到数据集中。我也不完全确定在此过程中重塑数据的最佳时间是什么时候?
还有最后一点混淆 - 当您将 Iterator 与 Dataset 对象一起使用时,我看到我们使用了 get_next() 方法。这是否意味着 Dataset 中的每个元素代表一整批数据?这是否意味着如果我们想改变批量大小,我们需要重建整个数据集对象?
我真的很难将所有部分组合在一起。如果有人对我有任何指示,将不胜感激!谢谢!
# import
import tensorflow as tf
# constants
filename = "tensorflow_test_data.csv"
num_rows = 100000
rows_per_ob = 10
batch_size_ = 5
num_epochs_ = 2
num_batches = int(num_rows * num_epochs_ / batch_size_ / rows_per_ob)
# read csv line
def read_from_csv(filename_queue):
reader = tf.TextLineReader(skip_header_lines=1)
_, value = reader.read(filename_queue)
record_defaults = [[0.0], [0.0], [0.0], [0.0]]
a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([a, b, c])
return features, d
def input_pipeline(filename=filename, batch_size=batch_size_, num_epochs=num_epochs_):
filename_queue = tf.train.string_input_producer([filename],
num_epochs=num_epochs,
shuffle=False)
x, y = read_from_csv(filename_queue)
x_batch, y_batch = tf.train.batch([x, y],
batch_size = batch_size * rows_per_ob,
num_threads=1,
capacity=10000)
return x_batch, y_batch
###
x, y = input_pipeline(filename, batch_size=batch_size_,
num_epochs = num_epochs_)
# I imagine using lists is wrong here - this was more just for me to
# see the output
x_list = []
y_list = []
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for _ in range(num_batches):
x_batch, y_batch = sess.run([x, y])
x_list.append(x_batch)
y_list.append(y_batch)
coord.request_stop()
coord.join(threads)
您可以使用 tf.data.Dataset
对象来表达整个管道,这可能会使事情稍微简单一些:
dataset = tf.data.TextLineDataset(filename)
# Skip the header line.
dataset = dataset.skip(1)
# Combine 10 lines into a single observation.
dataset = dataset.batch(rows_per_ob)
def parse_observation(line_batch):
record_defaults = [[0.0], [0.0], [0.0], [0.0]]
a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([a, b, c])
label = d[-1] # Take the label from the last row.
return features, label
# Parse each observation into a `row_per_ob X 2` matrix of features and a
# scalar label.
dataset = dataset.map(parse_observation)
# Batch multiple observations.
dataset = dataset.batch(batch_size)
# Optionally add a prefetch for performance.
dataset = dataset.prefetch(1)
要使用数据集中的值,您可以制作一个 tf.data.Iterator
以获取下一个元素作为一对 tf.Tensor
对象,然后将它们用作模型的输入。
iterator = dataset.make_one_shot_iterator()
features_batch, label_batch = iterator.get_next()
# Use the `features_batch` and `label_batch` tensors as the inputs to
# the model, rather than fetching them and feeding them via the `Session`
# interface.
train_op = build_model(features_batch, label_batch)