使用 parquet 文件的 Tensorflow 时间序列分类

Tensorflow time-series classification using parquet files

我目前收到以下错误之一(取决于数据准备的顺序):

TypeError: Inputs to a layer should be tensors. Got: <tensorflow.python.data.ops.dataset_ops._NestedVariant object at 0x000001E02F62FB00>

TypeError: Inputs to a layer should be tensors. Got: <_VariantDataset shapes: OrderedDict

背景:我有一些镶木地板文件,其中每个文件都是一个多变量时间序列。由于我将文件用于多元时间序列分类问题,因此我将标签存储在单个 numpy 数组中。我需要使用 tf.data.Dataset 来读取文件,因为我无法将它们全部放入内存中。

这是一个重现我的错误的工作示例:

import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Masking, LSTM, Dropout, Dense
#!pip install tensorflow-io
import tensorflow_io as tfio


num_files = 10
num_features = 3
num_timesteps = 50
num_classes = 2

batch_size = 2

for i in range(num_files):
    df = pd.DataFrame({"A": np.random.rand(num_timesteps), "B": np.random.rand(num_timesteps), "C": np.random.rand(num_timesteps)})
    df.to_parquet("file_{}.parquet".format(i))
    
columns_init = {"A": tf.TensorSpec(tf.TensorShape([]), tf.float32), "B": tf.TensorSpec(tf.TensorShape([]), tf.float32), "C": tf.TensorSpec(tf.TensorShape([]), tf.float32)}
    
labels = np.array([0, 1, 1, 1, 0, 1, 0, 0, 1, 0])

train_split_size = 0.8
num_train_files = int(train_split_size * num_files)

train_names = ["file_{}.parquet".format(i) for i in range(num_train_files)]
val_names = ["file_{}.parquet".format(i) for i in range(num_train_files, num_files)]

y_train = labels[ : num_train_files]
y_val = labels[num_train_files : num_files]

def map_fn(file_names, label_ds):
    return tfio.IODataset.from_parquet(file_names, columns=columns_init), label_ds

train_ds = tf.data.Dataset.from_tensor_slices((train_names, y_train))
train_ds = train_ds.shuffle(buffer_size = num_train_files)
train_ds = train_ds.map(map_fn)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(batch_size)

val_ds = tf.data.Dataset.from_tensor_slices((val_names, y_val))
# No need for shuffling the validation set
val_ds = val_ds.map(map_fn)
val_ds = val_ds.batch(batch_size)
val_ds = val_ds.prefetch(batch_size)

ip = Input(shape=(num_timesteps, num_features))
x = Masking()(ip)
x = LSTM(8)(x)
x = Dropout(0.8)(x)
out = Dense(1, activation='softmax')(x)

model = Model(ip, out)

model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=["accuracy"])

model.fit(train_ds, epochs=10, validation_data=val_ds)

如何克服这个错误?我宁愿将我的文件分开并仅随机播放它们的批处理方式,因为我不想干涉文件中的时间序列。 .csv 文件而不是 .parquet 是否有类似的解决方案。我更喜欢 parquet 文件,因为它们更轻便且更易于阅读,但如果没有周转时间,我很乐意转换我的文件。

对于遇到类似问题的任何人,我找到了一个解决方法,但并不简单。在这种情况下,我定义了一个 common_ds 函数来从文件中读取所有数据。我应用了批处理,其中批大小等于时间序列长度,以便在存储观察时拆分观察。 (注意:这假定文件已经过预处理,并且所有文件的行数都相同。)将特征与标签组合后,根据所需的批量大小对数据进行混洗和批处理。最后一步使用 pack_features_function 将格式更改为可以提供给模型的张量形状。

import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Masking, LSTM, Dropout, Dense, Input
#!pip install tensorflow-io
import tensorflow_io as tfio

num_files = 10
num_features = 3
num_timesteps = 50
num_classes = 2
batch_size = 2

for i in range(num_files):
    df = pd.DataFrame({"A": np.random.rand(num_timesteps),
                       "B": np.random.rand(num_timesteps), 
                       "C": np.random.rand(num_timesteps)})
    df.to_parquet("file_{}.parquet".format(i))
    
columns_init = {"A": tf.TensorSpec(tf.TensorShape([]), tf.float32), 
                "B": tf.TensorSpec(tf.TensorShape([]), tf.float32),
                "C": tf.TensorSpec(tf.TensorShape([]), tf.float32)}
    
labels = np.array([0, 1, 1, 1, 0, 1, 0, 0, 1, 0])

train_split_size = 0.8
num_train_files = int(train_split_size * num_files)

train_names = ["file_{}.parquet".format(i) for i in range(num_train_files)]
val_names = ["file_{}.parquet".format(i) for i in range(num_train_files, num_files)]

y_train = labels[ : num_train_files]
y_val = labels[num_train_files : num_files]

def make_common_ds(files):
    common_ds = tfio.IODataset.from_parquet(files[0], columns=columns_init)
    for file_name in files[1:]:
        ds = tfio.IODataset.from_parquet(file_name, columns=columns_init)
        common_ds = common_ds.concatenate(ds)
    return common_ds

def pack_features_vector(features, labels):
    """Pack the features into a single array."""
    features = tf.stack(list(features.values()), axis=2)
    return features, labels

train_names_ds = make_common_ds(train_names)
train_names_ds = train_names_ds.batch(num_timesteps)
train_label_ds = tf.data.Dataset.from_tensor_slices(y_train)
train_ds = tf.data.Dataset.zip((train_names_ds, train_label_ds))
train_ds = train_ds.shuffle(buffer_size = num_train_files)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(batch_size)
train_ds = train_ds.map(pack_features_vector)

val_names_ds = make_common_ds(val_names)
val_names_ds = val_names_ds.batch(num_timesteps)
val_label_ds = tf.data.Dataset.from_tensor_slices(y_val)
val_ds = tf.data.Dataset.zip((val_names_ds, val_label_ds))
# No need to shuffle the validation set
val_ds = val_ds.batch(batch_size)
val_ds = val_ds.prefetch(batch_size)
val_ds = val_ds.map(pack_features_vector)

ip = Input(shape=(num_timesteps, num_features))
x = Masking()(ip)
x = LSTM(8)(x)
x = Dropout(0.8)(x)
out = Dense(1, activation='softmax')(x)

model = Model(ip, out)

model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=["accuracy"])

model.fit(train_ds, epochs=10, validation_data=val_ds)