如何制作 from_tensor_slice 的嵌套结构以使用 tf.py_func 包装器在 Dataset.map 中传递两个参数

How to make a nested structure of from_tensor_slice to pass two arguments in Dataset.map with a tf.py_func wrapper

我试图通过使用 Dataset.map() 来创建我的输入管道,从而将 .h5 解析器函数映射到 py_func 包装器。我想在 map 函数中传递两个参数:filenamewindow_size。以下代码有调用顺序:Dataset.map --> _pyfn_wrapper --> parse_h5

缺点是使用 map() 函数 _pyfn_wrapper 只能接受一个参数,因为 from_tensor_slices 不能压缩 2 种类型的数据:字符串然后是 int

def helper(window_size, batch_size, ncores=mp.cpu_count()):
    flist = []
    for dirpath, _, fnames in os.walk('./'):
        for fname in fnames:
           flist.append(os.path.abspath(os.path.join(dirpath, fname)))
    f_len = len(flist)

    # init list of files
    batch = tf.data.Dataset.from_tensor_slices((tf.constant(flist)))  #fixme: how to zip one list of string and a list of int
    batch = batch.map_fn(_pyfn_wrapper, num_parallel_calls=ncores)  #fixme: how to map two args
    batch = batch.shuffle(batch_size).batch(batch_size, drop_remainder=True).prefetch(ncores + 6)

    # construct iterator
    it = batch.make_initializable_iterator()
    iter_init_op = it.initializer

    # get next img and label
    X_it, y_it = it.get_next()
    inputs = {'img': X_it, 'label': y_it, 'iterator_init_op': iter_init_op}
    return inputs, f_len


def _pyfn_wrapper(filename):  #fixme: args
    # filename, window_size = args  #fixme: try to separate args
    window_size = 100
    return tf.py_func(parse_h5,  #wrapped pythonic function
                      [filename, window_size],
                      [tf.float32, tf.float32]  #[input, output] dtype
                      )


def parse_h5(name, window_size):
    with h5py.File(name.decode('utf-8'), 'r') as f:
        X = f['X'][:].reshape(window_size, window_size, 1)
        y = f['y'][:].reshape(window_size, window_size, 1)
        return X, y


# create tf.data.Dataset
helper, f_len = helper(100, 5, True)
# inject into model
with tf.name_scope("Conv1"):
    W = tf.get_variable("W", shape=[3, 3, 1, 1],
                         initializer=tf.contrib.layers.xavier_initializer())
    b = tf.get_variable("b", shape=[1], initializer=tf.contrib.layers.xavier_initializer())
    layer1 = tf.nn.conv2d(helper['img'], W, strides=[1, 1, 1, 1], padding='SAME') + b
    logits = tf.nn.relu(layer1)

loss = tf.reduce_mean(tf.losses.mean_squared_error(labels=helper['label'], predictions=logits))
train_op = tf.train.AdamOptimizer(0.0001).minimize(loss)

# session
with tf.Session() as sess:
    sess.run(helper['iterator_init_op'])
    sess.run(tf.global_variables_initializer())
    for step in range(f_len):
        sess.run([train_op])

可以先运行下面的代码片段来先创建随机数据

import multiprocessing as mp
def write_h5(x):
    with h5py.File('./{}.h5'.format(x), 'w') as f:
            print(mp.current_process())  # see process ID
            x = y = np.arange(-1, 1, 0.02)
            xx, _ = np.meshgrid(x, y)
            a = xx ** 2
            b = np.add(a, np.random.randn(100, 100))  #do something and add gaussian noise
            f.create_dataset('X', shape=(100, 100), dtype='float32', data=a)
            f.create_dataset('y', shape=(100, 100), dtype='float32', data=b)

# init data
p = mp.Pool(mp.cpu_count())
p.map(write_h5, range(100))

使用Datasets 的嵌套结构作为@Sharky 的注释是解决方案之一。应该 unzip 这个嵌套的参数 在最后 parse_h5 函数而不是 _pyfn_wrapper 以避免错误:

TypeError: Tensor objects are only iterable when eager execution is enabled. To iterate over this tensor use tf.map_fn.

还应该解码参数,因为通过 tf.py_func() 参数被转换为二进制文字。

代码修改:

def helper(...):
     ...
     flist.append((os.path.abspath(os.path.join(dirpath, fname)), str(window_size)))
     ...
def _pyfn_wrapper(args):
    return tf.py_func(parse_h5,  #wrapped pythonic function
                      [args],
                      [tf.float32, tf.float32]  #output dtype
                      )

def parse_h5(args):
    name, window_size = args  #only unzip the args here
    window_size = int(window_size.decode('utf-8'))  #and decode for converting bin to int
    with h5py.File(name, 'r') as f:
        ...