使用 parquet 文件的 Tensorflow 时间序列分类
Tensorflow time-series classification using parquet files
我目前收到以下错误之一(取决于数据准备的顺序):
TypeError: Inputs to a layer should be tensors. Got: <tensorflow.python.data.ops.dataset_ops._NestedVariant object at 0x000001E02F62FB00>
TypeError: Inputs to a layer should be tensors. Got: <_VariantDataset shapes: OrderedDict
背景:我有一些镶木地板文件,其中每个文件都是一个多变量时间序列。由于我将文件用于多元时间序列分类问题,因此我将标签存储在单个 numpy 数组中。我需要使用 tf.data.Dataset
来读取文件,因为我无法将它们全部放入内存中。
这是一个重现我的错误的工作示例:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Masking, LSTM, Dropout, Dense
#!pip install tensorflow-io
import tensorflow_io as tfio
num_files = 10
num_features = 3
num_timesteps = 50
num_classes = 2
batch_size = 2
for i in range(num_files):
df = pd.DataFrame({"A": np.random.rand(num_timesteps), "B": np.random.rand(num_timesteps), "C": np.random.rand(num_timesteps)})
df.to_parquet("file_{}.parquet".format(i))
columns_init = {"A": tf.TensorSpec(tf.TensorShape([]), tf.float32), "B": tf.TensorSpec(tf.TensorShape([]), tf.float32), "C": tf.TensorSpec(tf.TensorShape([]), tf.float32)}
labels = np.array([0, 1, 1, 1, 0, 1, 0, 0, 1, 0])
train_split_size = 0.8
num_train_files = int(train_split_size * num_files)
train_names = ["file_{}.parquet".format(i) for i in range(num_train_files)]
val_names = ["file_{}.parquet".format(i) for i in range(num_train_files, num_files)]
y_train = labels[ : num_train_files]
y_val = labels[num_train_files : num_files]
def map_fn(file_names, label_ds):
return tfio.IODataset.from_parquet(file_names, columns=columns_init), label_ds
train_ds = tf.data.Dataset.from_tensor_slices((train_names, y_train))
train_ds = train_ds.shuffle(buffer_size = num_train_files)
train_ds = train_ds.map(map_fn)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(batch_size)
val_ds = tf.data.Dataset.from_tensor_slices((val_names, y_val))
# No need for shuffling the validation set
val_ds = val_ds.map(map_fn)
val_ds = val_ds.batch(batch_size)
val_ds = val_ds.prefetch(batch_size)
ip = Input(shape=(num_timesteps, num_features))
x = Masking()(ip)
x = LSTM(8)(x)
x = Dropout(0.8)(x)
out = Dense(1, activation='softmax')(x)
model = Model(ip, out)
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=["accuracy"])
model.fit(train_ds, epochs=10, validation_data=val_ds)
如何克服这个错误?我宁愿将我的文件分开并仅随机播放它们的批处理方式,因为我不想干涉文件中的时间序列。 .csv
文件而不是 .parquet
是否有类似的解决方案。我更喜欢 parquet 文件,因为它们更轻便且更易于阅读,但如果没有周转时间,我很乐意转换我的文件。
对于遇到类似问题的任何人,我找到了一个解决方法,但并不简单。在这种情况下,我定义了一个 common_ds
函数来从文件中读取所有数据。我应用了批处理,其中批大小等于时间序列长度,以便在存储观察时拆分观察。 (注意:这假定文件已经过预处理,并且所有文件的行数都相同。)将特征与标签组合后,根据所需的批量大小对数据进行混洗和批处理。最后一步使用 pack_features_function
将格式更改为可以提供给模型的张量形状。
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Masking, LSTM, Dropout, Dense, Input
#!pip install tensorflow-io
import tensorflow_io as tfio
num_files = 10
num_features = 3
num_timesteps = 50
num_classes = 2
batch_size = 2
for i in range(num_files):
df = pd.DataFrame({"A": np.random.rand(num_timesteps),
"B": np.random.rand(num_timesteps),
"C": np.random.rand(num_timesteps)})
df.to_parquet("file_{}.parquet".format(i))
columns_init = {"A": tf.TensorSpec(tf.TensorShape([]), tf.float32),
"B": tf.TensorSpec(tf.TensorShape([]), tf.float32),
"C": tf.TensorSpec(tf.TensorShape([]), tf.float32)}
labels = np.array([0, 1, 1, 1, 0, 1, 0, 0, 1, 0])
train_split_size = 0.8
num_train_files = int(train_split_size * num_files)
train_names = ["file_{}.parquet".format(i) for i in range(num_train_files)]
val_names = ["file_{}.parquet".format(i) for i in range(num_train_files, num_files)]
y_train = labels[ : num_train_files]
y_val = labels[num_train_files : num_files]
def make_common_ds(files):
common_ds = tfio.IODataset.from_parquet(files[0], columns=columns_init)
for file_name in files[1:]:
ds = tfio.IODataset.from_parquet(file_name, columns=columns_init)
common_ds = common_ds.concatenate(ds)
return common_ds
def pack_features_vector(features, labels):
"""Pack the features into a single array."""
features = tf.stack(list(features.values()), axis=2)
return features, labels
train_names_ds = make_common_ds(train_names)
train_names_ds = train_names_ds.batch(num_timesteps)
train_label_ds = tf.data.Dataset.from_tensor_slices(y_train)
train_ds = tf.data.Dataset.zip((train_names_ds, train_label_ds))
train_ds = train_ds.shuffle(buffer_size = num_train_files)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(batch_size)
train_ds = train_ds.map(pack_features_vector)
val_names_ds = make_common_ds(val_names)
val_names_ds = val_names_ds.batch(num_timesteps)
val_label_ds = tf.data.Dataset.from_tensor_slices(y_val)
val_ds = tf.data.Dataset.zip((val_names_ds, val_label_ds))
# No need to shuffle the validation set
val_ds = val_ds.batch(batch_size)
val_ds = val_ds.prefetch(batch_size)
val_ds = val_ds.map(pack_features_vector)
ip = Input(shape=(num_timesteps, num_features))
x = Masking()(ip)
x = LSTM(8)(x)
x = Dropout(0.8)(x)
out = Dense(1, activation='softmax')(x)
model = Model(ip, out)
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=["accuracy"])
model.fit(train_ds, epochs=10, validation_data=val_ds)
我目前收到以下错误之一(取决于数据准备的顺序):
TypeError: Inputs to a layer should be tensors. Got: <tensorflow.python.data.ops.dataset_ops._NestedVariant object at 0x000001E02F62FB00>
TypeError: Inputs to a layer should be tensors. Got: <_VariantDataset shapes: OrderedDict
背景:我有一些镶木地板文件,其中每个文件都是一个多变量时间序列。由于我将文件用于多元时间序列分类问题,因此我将标签存储在单个 numpy 数组中。我需要使用 tf.data.Dataset
来读取文件,因为我无法将它们全部放入内存中。
这是一个重现我的错误的工作示例:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Masking, LSTM, Dropout, Dense
#!pip install tensorflow-io
import tensorflow_io as tfio
num_files = 10
num_features = 3
num_timesteps = 50
num_classes = 2
batch_size = 2
for i in range(num_files):
df = pd.DataFrame({"A": np.random.rand(num_timesteps), "B": np.random.rand(num_timesteps), "C": np.random.rand(num_timesteps)})
df.to_parquet("file_{}.parquet".format(i))
columns_init = {"A": tf.TensorSpec(tf.TensorShape([]), tf.float32), "B": tf.TensorSpec(tf.TensorShape([]), tf.float32), "C": tf.TensorSpec(tf.TensorShape([]), tf.float32)}
labels = np.array([0, 1, 1, 1, 0, 1, 0, 0, 1, 0])
train_split_size = 0.8
num_train_files = int(train_split_size * num_files)
train_names = ["file_{}.parquet".format(i) for i in range(num_train_files)]
val_names = ["file_{}.parquet".format(i) for i in range(num_train_files, num_files)]
y_train = labels[ : num_train_files]
y_val = labels[num_train_files : num_files]
def map_fn(file_names, label_ds):
return tfio.IODataset.from_parquet(file_names, columns=columns_init), label_ds
train_ds = tf.data.Dataset.from_tensor_slices((train_names, y_train))
train_ds = train_ds.shuffle(buffer_size = num_train_files)
train_ds = train_ds.map(map_fn)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(batch_size)
val_ds = tf.data.Dataset.from_tensor_slices((val_names, y_val))
# No need for shuffling the validation set
val_ds = val_ds.map(map_fn)
val_ds = val_ds.batch(batch_size)
val_ds = val_ds.prefetch(batch_size)
ip = Input(shape=(num_timesteps, num_features))
x = Masking()(ip)
x = LSTM(8)(x)
x = Dropout(0.8)(x)
out = Dense(1, activation='softmax')(x)
model = Model(ip, out)
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=["accuracy"])
model.fit(train_ds, epochs=10, validation_data=val_ds)
如何克服这个错误?我宁愿将我的文件分开并仅随机播放它们的批处理方式,因为我不想干涉文件中的时间序列。 .csv
文件而不是 .parquet
是否有类似的解决方案。我更喜欢 parquet 文件,因为它们更轻便且更易于阅读,但如果没有周转时间,我很乐意转换我的文件。
对于遇到类似问题的任何人,我找到了一个解决方法,但并不简单。在这种情况下,我定义了一个 common_ds
函数来从文件中读取所有数据。我应用了批处理,其中批大小等于时间序列长度,以便在存储观察时拆分观察。 (注意:这假定文件已经过预处理,并且所有文件的行数都相同。)将特征与标签组合后,根据所需的批量大小对数据进行混洗和批处理。最后一步使用 pack_features_function
将格式更改为可以提供给模型的张量形状。
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Masking, LSTM, Dropout, Dense, Input
#!pip install tensorflow-io
import tensorflow_io as tfio
num_files = 10
num_features = 3
num_timesteps = 50
num_classes = 2
batch_size = 2
for i in range(num_files):
df = pd.DataFrame({"A": np.random.rand(num_timesteps),
"B": np.random.rand(num_timesteps),
"C": np.random.rand(num_timesteps)})
df.to_parquet("file_{}.parquet".format(i))
columns_init = {"A": tf.TensorSpec(tf.TensorShape([]), tf.float32),
"B": tf.TensorSpec(tf.TensorShape([]), tf.float32),
"C": tf.TensorSpec(tf.TensorShape([]), tf.float32)}
labels = np.array([0, 1, 1, 1, 0, 1, 0, 0, 1, 0])
train_split_size = 0.8
num_train_files = int(train_split_size * num_files)
train_names = ["file_{}.parquet".format(i) for i in range(num_train_files)]
val_names = ["file_{}.parquet".format(i) for i in range(num_train_files, num_files)]
y_train = labels[ : num_train_files]
y_val = labels[num_train_files : num_files]
def make_common_ds(files):
common_ds = tfio.IODataset.from_parquet(files[0], columns=columns_init)
for file_name in files[1:]:
ds = tfio.IODataset.from_parquet(file_name, columns=columns_init)
common_ds = common_ds.concatenate(ds)
return common_ds
def pack_features_vector(features, labels):
"""Pack the features into a single array."""
features = tf.stack(list(features.values()), axis=2)
return features, labels
train_names_ds = make_common_ds(train_names)
train_names_ds = train_names_ds.batch(num_timesteps)
train_label_ds = tf.data.Dataset.from_tensor_slices(y_train)
train_ds = tf.data.Dataset.zip((train_names_ds, train_label_ds))
train_ds = train_ds.shuffle(buffer_size = num_train_files)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(batch_size)
train_ds = train_ds.map(pack_features_vector)
val_names_ds = make_common_ds(val_names)
val_names_ds = val_names_ds.batch(num_timesteps)
val_label_ds = tf.data.Dataset.from_tensor_slices(y_val)
val_ds = tf.data.Dataset.zip((val_names_ds, val_label_ds))
# No need to shuffle the validation set
val_ds = val_ds.batch(batch_size)
val_ds = val_ds.prefetch(batch_size)
val_ds = val_ds.map(pack_features_vector)
ip = Input(shape=(num_timesteps, num_features))
x = Masking()(ip)
x = LSTM(8)(x)
x = Dropout(0.8)(x)
out = Dense(1, activation='softmax')(x)
model = Model(ip, out)
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=["accuracy"])
model.fit(train_ds, epochs=10, validation_data=val_ds)