Tensorflow:加载大数据的现代方式

Tensorflow: Modern way to load large data

我想使用 numpy 数组作为输入数据来训练卷积神经网络(使用来自 Tensorflow 版本 1.13 的 tf.keras)。训练数据(我目前存储在一个 >30GB 的“.npz”文件中)不能一次性全部放入 RAM。 将大型数据集保存和加载到神经网络进行训练的最佳方法是什么?因为我没能找到这个(肯定无处不在?)问题的好答案,我希望能在这里听到一个。非常感谢您的帮助!

来源

类似的问题似乎已经被问过很多次(例如 , , ),但已经有好几年了,通常没有结论性的答案。

我目前的理解是,使用 TFRecord 文件是解决此问题的好方法。到目前为止,我发现最有前途的教程解释了如何在 keras 中使用 TFRecord 文件是 medium.com. Other helpful sources were machinelearninguru.com and medium.com_source2 和 sources therin.

官方 tensorflow 文档和教程(tf.data.Dataset, Importing Data, tf_records 等)对我没有帮助。特别是,那里给出的几个例子即使没有修改也不适合我。

我尝试使用 TFRecord 文件

我假设 TFRecords 是解决我的问题的好方法,但我很难使用它们。这是我根据教程 medium.com 制作的示例。我尽可能地精简了代码。

# python 3.6, tensorflow 1.13.
# Adapted from https://medium.com/@moritzkrger/speeding-up-keras-with-tfrecord-datasets-5464f9836c36
import tensorflow as tf
import numpy as np
from tensorflow.python import keras as keras


# Helper functions (see also https://www.tensorflow.org/tutorials/load_data/tf_records)
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def writeTFRecords():
    number_of_samples = 100  # create some random data to play with
    images, labels = (np.random.sample((number_of_samples, 256, 256, 1)), np.random.randint(0, 30, number_of_samples))

    writer = tf.python_io.TFRecordWriter("bla.tfrecord")

    for index in range(images.shape[0]):
        image = images[index]
        label = labels[index]

        feature = {'image':  _bytes_feature(tf.compat.as_bytes(image.tostring())),
                   'label':  _int64_feature(int(label))}

        example = tf.train.Example(features=tf.train.Features(feature=feature))
        writer.write(example.SerializeToString())
    writer.close()


def loadTFRecord(data_path):
    with tf.Session() as sess:
        feature = {'train/image': tf.FixedLenFeature([], tf.string),
                   'train/label': tf.FixedLenFeature([], tf.int64)}
        # Create a list of filenames and pass it to a queue
        filename_queue = tf.train.string_input_producer([data_path], num_epochs=1)
        # Define a reader and read the next record
        reader = tf.TFRecordReader()
        _, serialized_example = reader.read(filename_queue)
        # Decode the record read by the reader
        features = tf.parse_single_example(serialized_example, features=feature)
        # Convert the image data from string back to the numbers
        image = tf.decode_raw(features['train/image'], tf.float32)

        # Cast label data into int32
        label = tf.cast(features['train/label'], tf.int32)
        # Reshape image data into the original shape
        image = tf.reshape(image, [256, 256, 1])

        return image, label  # I'm not 100% sure that's how this works...


# ######### generate a TFRecords file in the working directory containing random data. #################################
writeTFRecords()
# ######## Load the TFRecords file and use it to train a simple example neural network. ################################
image, label = loadTFRecord("bla.tfrecord")

model_input = keras.layers.Input(tensor=image)
model_output = keras.layers.Flatten(input_shape=(-1, 256, 256, 1))(model_input)
model_output = keras.layers.Dense(16, activation='relu')(model_output)

train_model = keras.models.Model(inputs=model_input, outputs=model_output)
train_model.compile(optimizer=keras.optimizers.RMSprop(lr=0.0001),  
                    loss='mean_squared_error',
                    target_tensors=[label])

print("\n \n start training \n \n") # Execution gets stuck on fitting
train_model.fit(epochs=1, steps_per_epoch=10)  # no output or error messages.

该代码创建一个 TFRecord 文件并开始拟合,然后卡住,没有输出或错误消息。我不知道问题出在哪里,也不知道该如何解决。

虽然这不是原始问题的真正答案(即 "what is the optimal way to train on large datasets"),但我设法让 tfrecords 和数据集正常工作。这个 tutorial on YouTube 特别有帮助。我提供了一个最小示例,其中包含适用于遇到相同问题的任何人的工作代码。

# Developed using python 3.6, tensorflow 1.14.0.
# This code writes data (pairs (label, image) where label is int64 and image is np.ndarray) into .tfrecord files and
# uses them for training a simple neural network. It is meant as a minimal working example of how to use tfrecords. This
# solution is likely not optimal. If you know how to improve it, please comment on
#  Refer to links therein for further information.
import tensorflow as tf
import numpy as np
from tensorflow.python import keras as keras


# Helper functions (see also https://www.tensorflow.org/tutorials/load_data/tf_records)
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def write_tfrecords_file(out_path: str, images: np.ndarray, labels: np.ndarray) -> None:
    """Write all image-label pairs into a single .tfrecord file.
    :param out_path: File path of the .tfrecord file to generate or overwrite.
    :param images: array with first dimension being the image index. Every images[i].tostring() is
        serialized and written into the file as 'image': wrap_bytes(img_bytes)
    :param labels: 1d array of integers. labels[i] is the label of images[i]. Written as 'label': wrap_int64(label)"""
    assert len(images) == len(labels)
    with tf.io.TFRecordWriter(out_path) as writer:  # could use writer_options parameter to enable compression
        for i in range(len(labels)):
            img_bytes = images[i].tostring()  # Convert the image to raw bytes.
            label = labels[i]
            data = {'image': _bytes_feature(img_bytes), 'label': _int64_feature(label)}
            feature = tf.train.Features(feature=data)  # Wrap the data as TensorFlow Features.
            example = tf.train.Example(features=feature)  # Wrap again as a TensorFlow Example.
            serialized = example.SerializeToString()  # Serialize the data.
            writer.write(serialized)  # Write the serialized data to the TFRecords file.


def parse_example(serialized, shape=(256, 256, 1)):
    features = {'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64)}
    # Parse the serialized data so we get a dict with our data.
    parsed_example = tf.io.parse_single_example(serialized=serialized, features=features)
    label = parsed_example['label']
    image_raw = parsed_example['image']  # Get the image as raw bytes.
    image = tf.decode_raw(image_raw, tf.float32)  # Decode the raw bytes so it becomes a tensor with type.
    image = tf.reshape(image, shape=shape)
    return image, label  # this function will be called once (to add it to tf graph; then parse images individually)


# create some arbitrary data to play with: 1000 images sized 256x256 with one colour channel. Use your custom np-arrays
IMAGE_WIDTH, NUM_OF_IMAGES, NUM_OF_CLASSES, COLOUR_CHANNELS = 256, 10_000, 10, 1
# using float32 to save memory. Must match type in parse_example(), tf.decode_raw(image_raw, tf.float32)
features_train = np.random.sample((NUM_OF_IMAGES, IMAGE_WIDTH, IMAGE_WIDTH, COLOUR_CHANNELS)).astype(np.float32)
labels_train = np.random.randint(low=0, high=NUM_OF_CLASSES, size=NUM_OF_IMAGES)  # one random label for each image
features_eval = features_train[:200]  # use the first 200 images as evaluation data for simplicity.
labels_eval = labels_train[:200]
write_tfrecords_file("train.tfrecord", features_train, labels_train)  # normal: split the data files of several GB each
write_tfrecords_file("eval.tfrecord", features_eval, labels_eval)  # this may take a while. Consider a progressbar
# The files are complete. Now define a model and use datasets to feed the data from the .tfrecord files into the model.
model = keras.Sequential([keras.layers.Flatten(input_shape=(256, 256, 1)),
                          keras.layers.Dense(128, activation='relu'),
                          keras.layers.Dense(10, activation='softmax')])
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
# Check docs for parameters (compression, buffer size, thread count. Also www.tensorflow.org/guide/performance/datasets

train_dataset = tf.data.TFRecordDataset("train.tfrecord")  # specify a list (or dataset) of file names for large data
train_dataset = train_dataset.map(parse_example)  # parse tfrecords. Parameter num_parallel_calls may help performance.
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

validation_dataset = tf.data.TFRecordDataset("eval.tfrecord")
validation_dataset = validation_dataset.map(parse_example).batch(64)

model.fit(train_dataset, epochs=3)
# evaluate the results
results = model.evaluate(validation_dataset)
print('\n\nvalidation loss, validation acc:', results)

请注意,对数据集对象使用 some_keras_model.fit(..., validation_data=some_dataset) 很棘手。这可能会导致 TypeError: 'DatasetV1Adapter' object does not support indexing。 这似乎是一个错误(请参阅 github.com/tensorflow/tensorflow/issues/28995)并且据称已从 tf-nightly 版本“1.15.0-dev20190808”开始修复; official tutorial uses this too, although it doesn't work in most versions. An easy but dirty-ish fix is to use verbose=0 (which only suppresses program output) and plot the validation results using tensorboard. Also see .