在 TensorFlow Dataset 中使用 window() 函数访问多行
Use of window() function in TensorFlow Dataset to access more than one row
我在将 tf.data.experimental.CsvDataset 从 .csv 文件读取的数据集转换为 "timeseries" 时遇到问题。
我想做的是一次访问多行数据集,以便将前两行的特征附加到当前行并保留当前行的标签。我想对每一行都这样做(除了前两行)。我认为应用 window()
函数是正确的方法,但现在我不太确定。
包含大约 300 列的原始数据集是通过读取一组 .csv 文件创建的,如下所示:
ds = tf.data.experimental.CsvDataset(
filenames,
[tf.float32] * len(columns_indices_to_parse),
header=True,
select_cols=columns_indices_to_parse
)
为了重现性,我使用了 Dataset.from_tensor_slices() 和 Dataset.zip() 的组合:
import tensorflow as tf
tf.enable_eager_execution()
with tf.Graph().as_default(), tf.Session() as sess:
# Simulate what's being returned from CsvDataset():
feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])
ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))
# Do transformations to obtain "timeseries" data.
def _parse_function_features(*row):
features = tf.stack(row[:2], axis=-1)
return features
def _parse_function_labels(*row):
labels = tf.stack(row[2:], axis=-1)
return labels
def _reshape(x):
# Flatten rows into one.
return tf.reshape(x, shape=[-1])
ds_features = ds.map(_parse_function_features).window(3).flat_map(lambda x: x.batch(3)).map(_reshape)
ds_labels = ds.map(_parse_function_labels).skip(2)
ds = tf.data.Dataset.zip((ds_features, ds_labels))
iter = ds.make_one_shot_iterator().get_next()
# Show dataset contents
print('Result:')
while True:
try:
print(sess.run(iter))
except tf.errors.OutOfRangeError:
break
我仍在思考 window() 转换,我看到了 this GitHub 问题,但它并没有解决我的问题。
我现在得到的是:
(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32))
(array([ 7., 8., 9., 10.], dtype=float32), array([1.], dtype=float32))
问题在于它的行为类似于批处理 - 以三元组处理行。我想要实现的是:
(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32)) # with label of the third row
(array([3., 4., 5., 6., 7., 8.], dtype=float32), array([1.], dtype=float32)) # with label of the fourth row
(array([5., 6., 7., 8., 9., 10.], dtype=float32), array([0.], dtype=float32)) # with label of the fifth row
我有点卡住了,我不确定使用 window() 函数访问多行数据集是否是正确的方法。我之前问过非常相似的问题,但我删除了它,因为我认为我包含了太多细节,在这里我尽量保持精简。任何帮助将不胜感激,谢谢!
好的,经过多方面的解决,终于达到了要求的效果。我有两种解决方案:一种将特征和标签作为单独的数据集处理,另一种将转换一次性应用于数据集。两者都可能有用,具体取决于用例。
- 将特征和标签处理为单独的数据集:
import tensorflow as tf
tf.enable_eager_execution()
with tf.Graph().as_default(), tf.Session() as sess:
# Simulate what's being returned from CsvDataset():
feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])
ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))
# Do transformations to obtain "timeseries" data.
def _parse_function_features(*row):
features = tf.stack(row[:2], axis=-1)
return features
def _parse_function_labels(*row):
labels = tf.stack(row[2:], axis=-1)
return labels
def _reshape(x):
# Flatten rows into one.
return tf.reshape(x, shape=[-1])
ds_features = ds.map(_parse_function_features).window(3, shift=1).flat_map(lambda x: x.batch(3)).map(_reshape)
ds_labels = ds.map(_parse_function_labels).window(3, shift=1).flat_map(lambda x: x.skip(2))
ds = tf.data.Dataset.zip((ds_features, ds_labels))
iter = ds.make_one_shot_iterator().get_next()
# Show dataset contents
print('Result:')
while True:
try:
print(sess.run(iter))
except tf.errors.OutOfRangeError:
break
- 一次性转换数据集:
import tensorflow as tf
tf.enable_eager_execution()
with tf.Graph().as_default(), tf.Session() as sess:
# Simulate what's being returned from CsvDataset():
feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])
ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))
# Do transformations to obtain "timeseries" data.
def _parse_function(*row):
features = tf.stack(row[:2], axis=-1)
labels = tf.stack(row[2:], axis=-1)
return features, labels
def _reshape(features, labels):
# Flatten features into one row.
return tf.reshape(features, shape=[-1]), labels
ds = ds.map(_parse_function)
ds = ds.window(3, shift=1)
ds = ds.flat_map(lambda x, y: tf.data.Dataset.zip((x.batch(3), y.skip(2))))
ds = ds.map(_reshape)
iter = ds.make_one_shot_iterator().get_next()
# Show dataset contents
print('Result:')
while True:
try:
print(sess.run(iter))
except tf.errors.OutOfRangeError:
break
对于这两个输出是:
Result:
(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32))
(array([3., 4., 5., 6., 7., 8.], dtype=float32), array([1.], dtype=float32))
(array([ 5., 6., 7., 8., 9., 10.], dtype=float32), array([0.], dtype=float32))
我在将 tf.data.experimental.CsvDataset 从 .csv 文件读取的数据集转换为 "timeseries" 时遇到问题。
我想做的是一次访问多行数据集,以便将前两行的特征附加到当前行并保留当前行的标签。我想对每一行都这样做(除了前两行)。我认为应用 window()
函数是正确的方法,但现在我不太确定。
包含大约 300 列的原始数据集是通过读取一组 .csv 文件创建的,如下所示:
ds = tf.data.experimental.CsvDataset(
filenames,
[tf.float32] * len(columns_indices_to_parse),
header=True,
select_cols=columns_indices_to_parse
)
为了重现性,我使用了 Dataset.from_tensor_slices() 和 Dataset.zip() 的组合:
import tensorflow as tf
tf.enable_eager_execution()
with tf.Graph().as_default(), tf.Session() as sess:
# Simulate what's being returned from CsvDataset():
feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])
ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))
# Do transformations to obtain "timeseries" data.
def _parse_function_features(*row):
features = tf.stack(row[:2], axis=-1)
return features
def _parse_function_labels(*row):
labels = tf.stack(row[2:], axis=-1)
return labels
def _reshape(x):
# Flatten rows into one.
return tf.reshape(x, shape=[-1])
ds_features = ds.map(_parse_function_features).window(3).flat_map(lambda x: x.batch(3)).map(_reshape)
ds_labels = ds.map(_parse_function_labels).skip(2)
ds = tf.data.Dataset.zip((ds_features, ds_labels))
iter = ds.make_one_shot_iterator().get_next()
# Show dataset contents
print('Result:')
while True:
try:
print(sess.run(iter))
except tf.errors.OutOfRangeError:
break
我仍在思考 window() 转换,我看到了 this GitHub 问题,但它并没有解决我的问题。
我现在得到的是:
(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32))
(array([ 7., 8., 9., 10.], dtype=float32), array([1.], dtype=float32))
问题在于它的行为类似于批处理 - 以三元组处理行。我想要实现的是:
(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32)) # with label of the third row
(array([3., 4., 5., 6., 7., 8.], dtype=float32), array([1.], dtype=float32)) # with label of the fourth row
(array([5., 6., 7., 8., 9., 10.], dtype=float32), array([0.], dtype=float32)) # with label of the fifth row
我有点卡住了,我不确定使用 window() 函数访问多行数据集是否是正确的方法。我之前问过非常相似的问题,但我删除了它,因为我认为我包含了太多细节,在这里我尽量保持精简。任何帮助将不胜感激,谢谢!
好的,经过多方面的解决,终于达到了要求的效果。我有两种解决方案:一种将特征和标签作为单独的数据集处理,另一种将转换一次性应用于数据集。两者都可能有用,具体取决于用例。
- 将特征和标签处理为单独的数据集:
import tensorflow as tf
tf.enable_eager_execution()
with tf.Graph().as_default(), tf.Session() as sess:
# Simulate what's being returned from CsvDataset():
feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])
ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))
# Do transformations to obtain "timeseries" data.
def _parse_function_features(*row):
features = tf.stack(row[:2], axis=-1)
return features
def _parse_function_labels(*row):
labels = tf.stack(row[2:], axis=-1)
return labels
def _reshape(x):
# Flatten rows into one.
return tf.reshape(x, shape=[-1])
ds_features = ds.map(_parse_function_features).window(3, shift=1).flat_map(lambda x: x.batch(3)).map(_reshape)
ds_labels = ds.map(_parse_function_labels).window(3, shift=1).flat_map(lambda x: x.skip(2))
ds = tf.data.Dataset.zip((ds_features, ds_labels))
iter = ds.make_one_shot_iterator().get_next()
# Show dataset contents
print('Result:')
while True:
try:
print(sess.run(iter))
except tf.errors.OutOfRangeError:
break
- 一次性转换数据集:
import tensorflow as tf
tf.enable_eager_execution()
with tf.Graph().as_default(), tf.Session() as sess:
# Simulate what's being returned from CsvDataset():
feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])
ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))
# Do transformations to obtain "timeseries" data.
def _parse_function(*row):
features = tf.stack(row[:2], axis=-1)
labels = tf.stack(row[2:], axis=-1)
return features, labels
def _reshape(features, labels):
# Flatten features into one row.
return tf.reshape(features, shape=[-1]), labels
ds = ds.map(_parse_function)
ds = ds.window(3, shift=1)
ds = ds.flat_map(lambda x, y: tf.data.Dataset.zip((x.batch(3), y.skip(2))))
ds = ds.map(_reshape)
iter = ds.make_one_shot_iterator().get_next()
# Show dataset contents
print('Result:')
while True:
try:
print(sess.run(iter))
except tf.errors.OutOfRangeError:
break
对于这两个输出是:
Result:
(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32))
(array([3., 4., 5., 6., 7., 8.], dtype=float32), array([1.], dtype=float32))
(array([ 5., 6., 7., 8., 9., 10.], dtype=float32), array([0.], dtype=float32))