在 TensorFlow Dataset 中使用 window() 函数访问多行

Use of window() function in TensorFlow Dataset to access more than one row

我在将 tf.data.experimental.CsvDataset 从 .csv 文件读取的数据集转换为 "timeseries" 时遇到问题。

我想做的是一次访问多行数据集,以便将前两行的特征附加到当前行并保留当前行的标签。我想对每一行都这样做(除了前两行)。我认为应用 window() 函数是正确的方法,但现在我不太确定。

包含大约 300 列的原始数据集是通过读取一组 .csv 文件创建的,如下所示:

ds = tf.data.experimental.CsvDataset(
            filenames,
            [tf.float32] * len(columns_indices_to_parse),
            header=True,
            select_cols=columns_indices_to_parse
        )

为了重现性,我使用了 Dataset.from_tensor_slices() 和 Dataset.zip() 的组合:

import tensorflow as tf

tf.enable_eager_execution()

with tf.Graph().as_default(), tf.Session() as sess:
    # Simulate what's being returned from CsvDataset():
    feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
    feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
    label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])

    ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))

    # Do transformations to obtain "timeseries" data.
    def _parse_function_features(*row):
        features = tf.stack(row[:2], axis=-1)
        return features

    def _parse_function_labels(*row):
        labels = tf.stack(row[2:], axis=-1)
        return labels

    def _reshape(x):
        # Flatten rows into one.
        return tf.reshape(x, shape=[-1])

    ds_features = ds.map(_parse_function_features).window(3).flat_map(lambda x: x.batch(3)).map(_reshape)
    ds_labels = ds.map(_parse_function_labels).skip(2)
    ds = tf.data.Dataset.zip((ds_features, ds_labels))

    iter = ds.make_one_shot_iterator().get_next()
    # Show dataset contents
    print('Result:')
    while True:
        try:
            print(sess.run(iter))
        except tf.errors.OutOfRangeError:
            break

我仍在思考 window() 转换,我看到了 this GitHub 问题,但它并没有解决我的问题。

我现在得到的是:

(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32))
(array([ 7.,  8.,  9., 10.], dtype=float32), array([1.], dtype=float32))

问题在于它的行为类似于批处理 - 以三元组处理行。我想要实现的是:

(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32)) # with label of the third row
(array([3., 4., 5., 6., 7., 8.], dtype=float32), array([1.], dtype=float32)) # with label of the fourth row
(array([5., 6., 7., 8., 9., 10.], dtype=float32), array([0.], dtype=float32)) # with label of the fifth row

我有点卡住了,我不确定使用 window() 函数访问多行数据集是否是正确的方法。我之前问过非常相似的问题,但我删除了它,因为我认为我包含了太多细节,在这里我尽量保持精简。任何帮助将不胜感激,谢谢!

好的,经过多方面的解决,终于达到了要求的效果。我有两种解决方案:一种将特征和标签作为单独的数据集处理,另一种将转换一次性应用于数据集。两者都可能有用,具体取决于用例。

  1. 将特征和标签处理为单独的数据集:
import tensorflow as tf

tf.enable_eager_execution()

with tf.Graph().as_default(), tf.Session() as sess:
    # Simulate what's being returned from CsvDataset():
    feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
    feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
    label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])

    ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))

    # Do transformations to obtain "timeseries" data.
    def _parse_function_features(*row):
        features = tf.stack(row[:2], axis=-1)
        return features

    def _parse_function_labels(*row):
        labels = tf.stack(row[2:], axis=-1)
        return labels

    def _reshape(x):
        # Flatten rows into one.
        return tf.reshape(x, shape=[-1])

    ds_features = ds.map(_parse_function_features).window(3, shift=1).flat_map(lambda x: x.batch(3)).map(_reshape)
    ds_labels = ds.map(_parse_function_labels).window(3, shift=1).flat_map(lambda x: x.skip(2))
    ds = tf.data.Dataset.zip((ds_features, ds_labels))

    iter = ds.make_one_shot_iterator().get_next()
    # Show dataset contents
    print('Result:')
    while True:
        try:
            print(sess.run(iter))
        except tf.errors.OutOfRangeError:
            break
  1. 一次性转换数据集:
import tensorflow as tf

tf.enable_eager_execution()

with tf.Graph().as_default(), tf.Session() as sess:
    # Simulate what's being returned from CsvDataset():
    feature_1_ds = tf.data.Dataset.from_tensor_slices([1., 3., 5., 7., 9.])
    feature_2_ds = tf.data.Dataset.from_tensor_slices([2., 4., 6., 8., 10.])
    label_1_ds = tf.data.Dataset.from_tensor_slices([1.0, 1.0, 0.0, 1.0, 0.0])

    ds = tf.data.Dataset.zip((feature_1_ds, feature_2_ds, label_1_ds))

    # Do transformations to obtain "timeseries" data.
    def _parse_function(*row):
        features = tf.stack(row[:2], axis=-1)
        labels = tf.stack(row[2:], axis=-1)
        return features, labels


    def _reshape(features, labels):
        # Flatten features into one row.
        return tf.reshape(features, shape=[-1]), labels


    ds = ds.map(_parse_function)
    ds = ds.window(3, shift=1)
    ds = ds.flat_map(lambda x, y: tf.data.Dataset.zip((x.batch(3), y.skip(2))))
    ds = ds.map(_reshape)

    iter = ds.make_one_shot_iterator().get_next()
    # Show dataset contents
    print('Result:')
    while True:
        try:
            print(sess.run(iter))
        except tf.errors.OutOfRangeError:
            break

对于这两个输出是:

Result:
(array([1., 2., 3., 4., 5., 6.], dtype=float32), array([0.], dtype=float32))
(array([3., 4., 5., 6., 7., 8.], dtype=float32), array([1.], dtype=float32))
(array([ 5.,  6.,  7.,  8.,  9., 10.], dtype=float32), array([0.], dtype=float32))