在 TensorFlow 中导入巨大的非图像数据集

Import huge non-image dataset in TensorFlow

我有一个大数据集(300.000 个示例 x 33.000 个特征),这当然不适合内存。数据以 HDF5 格式保存。这些值大多为零(稀疏数据)。它们看起来像这样:

           Attr1    52  52  52  52  52  52  52  52 ...
           Attr2    umb umb umb umb umb umb umb umb ...
           CellID   TGC-1 TGG-1 CAG-1 TTC-1 GTG-1 GTA-1 CAA-1 CAC-1 ...

Acc     Gene                                      ...
243485  RP11-.3     0   0   0   0   0   0   0   0 ...
237613  FAM138A     0   0   0   0   0   0   0   0 ...
186092  OR4F5       0   0   0   0   0   0   0   0 ...
238009  RP11-.7     0   0   0   0   0   0   0   0 ...
239945  RP11-.8     0   0   0   0   0   0   0   0 ...
279457  FO538.2     0   0   0   0   0   0   0   0 ...
228463  AP006.2     0   0   0   0   0   0   0   0 ...
...     ...         ... ... ... ... ... ... ... ...

我已经完成了以下工作,将整个数据集加载到 TensorFlow 中(loompy 只是一个在后台使用 hdf5 的包):

import tensorflow as tf
import numpy as np
import loompy as lp

batch_size = 1000

with loompy.connect(filename, 'r') as ds:
    ds_shape = (batch_size, ds.shape[0])
    ds_dtype = ds[0:1, 0:1].dtype

    labels = np.asarray([ds.ca.CellID, ds.ca.Attr1]).T
    labels_shape = (batch_size, 1)

data_placeholder = tf.placeholder(ds_dtype, ds_shape)
labels_placeholder = tf.placeholder(labels[:,1].dtype, labels_shape)

dataset = tf.data.Dataset.from_tensor_slices((data_placeholder, labels_placeholder))
dataset = dataset.prefetch(batch_size)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:
    with loompy.connect(filename, 'r') as ds:
        for i in range(0, ds.shape[1], batch_size):
            batch = ds[0 : ds_shape[1], i : i + batch_size].T
            batch_labels = np.asarray([ds.ca.CellID[i : i + batch_size],
                                       ds.ca.Attr1[i : i + batch_size]]).T[:,1]

            sess.run(iterator.initializer, feed_dict = {data_placeholder: batch,
                       labels_placeholder: batch_labels.reshape(batch_size, 1)})

            for _ in range(batch_size):
                print(sess.run(next_element))

输出:

(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))

(array([0, 0, 0, ..., 0, 0, 0], dtype=int32), array([b'52'], dtype=object))

...

但是,通过这种方式,我无法将数据拆分为训练集、测试集和评估集。另外,我只能在每个批次内随机播放它们,这是无效的,因为大多数时候批次上的数据属于相同的 class.

我如何操作此类数据才能将它们加载为训练集、测试集、评估集并执行改组等(最好尽可能利用我的 TitanX GPU)?

您绝对应该尝试 Dask, it allows you to work with data not fitting in memory and it paralyzes computation so that you can use all cores of your cpu. Also I recommend moving your data from hdf to parquet,它允许并发读取和写入,从而加快速度。请参阅 link Wes McKinney(pandas 创作者)深入探讨并将其与其他格式进行比较的地方。

您可以在 Dask 中准备片段以准备训练集、测试集和验证集,并在不超过可用内存的情况下读取它们。

万一还有人对这个话题感兴趣,这里是我对这个问题的解决方案。最后我坚持使用 Loompy 文件格式,因为它对我所做的事情来说真的很方便(看看 Loompy here)。为了在我的模型中导入如此大量的信息,我使用了 tf.data.Dataset TensorFlow API 的 from_generator() 函数。另外,我创建了一个生成器来根据需要生成数据。

下面是我的输入函数的样子:

import loompy as lp
import tensorflow as tf
from sklearn.model_selection import train_test_split

model_input_name = ""
input_size = 10000
batch_size = 32
epochs = 10

# Input functions for train, test and eval sets.
def train_input_fn():
    return _input_fn('TRAIN')

def test_input_fn():
    return _input_fn('TEST')

def eval_input_fn():
    return _input_fn('EVAL')

# General purpose input function
def _input_fn(mode = 'TRAIN'):

    """
        Arguments
            mode : 'TRAIN', 'TEST', 'EVAL'
    """

    # A generator to yield data and labels from the given FILE,
    # based on the indices assigned to the "indices" variable.
    # If you change the labels, remember to update the from_generator()
    # parameters below, to reflect their datatype.
    def gen():
        with lp.connect(FILE, 'r') as ds:
            if ae:
                for i in indices:
                    yield {model_input_name: ds[:, i]}, ds[:, i]
            else:
                for i in indices:
                    yield {model_input_name: ds[:, i]}, ds.ca.x_CellType[i]

    # Get the indices for train, test and eval sets
    train_idx, test_idx, eval_idx = train_test_set_idx_split(TRAIN_RT, TEST_RT, EVAL_RT)

    # Check condition and assign the respective set to the "indices" variable
    if mode == 'TRAIN':
        indices = train_idx
    elif mode == 'TEST':
        indices = test_idx
    elif mode == 'EVAL':
        indices = eval_idx
    else:
        print("Wrong mode choice: ", mode)
        exit(1)

    dataset = tf.data.Dataset.from_generator(gen, ({model_input_name: tf.int64}, tf.int64),
                                             output_shapes=({model_input_name: [input_size,]}, []))

    # Shuffle, batch, map, prefetch and repeat your dataset.
    # If you need to do some preprocessing on the data, create your function on
    # the cell above, and call it within a map() function.

    dataset = dataset.shuffle(buffer_size=batch_size*50)
    dataset = dataset.batch(batch_size)

    dataset = dataset.map(_reshape_labels)
    dataset = dataset.map(_int2float)

    # Map on whatever other functions you need
    dataset = dataset.map( ... )

    dataset = dataset.prefetch(2)
    dataset = dataset.repeat(epochs)

    iterator = dataset.make_one_shot_iterator()

    return iterator.get_next()


# Get train, test, eval indices for the given dataset
def train_test_set_idx_split(train_rt, test_rt, eval_rt):
    """ This function returns indices for the train, test and evaluation sets,
        given an input Dataset.
        Arguments:
            train_rt: ratio of the train dataset
            test_rt:  ratio of the test dataset
            eval_rt:  ratio of the evaluation dataset

        Returns:
            train_idx: indices (of the given dataset) for the train dataset
            test_idx:  indices (of the given dataset) for the test dataset
            evel_idx:  indices (of the given dataset) for the evaluation dataset

        Note:
            This function will work correctly as long as (test_rt == evel_rt) is True.
            If you need (test_rt != evel_rt), you need something more sophisticated.
    """

    with lp.connect(FILE, 'r') as ds:
        idx = np.array(range(0, ds.shape[1]))

    train_idx, test_idx = train_test_split(idx, train_size=train_rt, test_size=test_rt+eval_rt)
    test_idx, eval_idx = train_test_split(test_idx, train_size=0.5, test_size=0.5)

    return train_idx, test_idx, eval_idx

# Reshape labels as needed
def _reshape_labels(data, labels):
    return data, tf.reshape(labels, (-1,1))

dask和tensorflow等机器学习框架的差距其实是分布式内存缓存。

我们可以看到dask对于预处理部分是一个非常好的选择,但是如何将预处理后的数据传输到tensorflow是我们所苦恼的。

Vineyard(v6d.io) 通过提供与 dask 等数据引擎和 tf 和 pytorch 等 ml 引擎的集成来解决中间数据共享问题。 这是一个示例 (https://v6d.io/examples/distributed-learning.html),展示了如何将预处理后的数据从 dask 传输到 horovod.keras,希望对您有所帮助。