如何制作 from_tensor_slice 的嵌套结构以使用 tf.py_func 包装器在 Dataset.map 中传递两个参数
How to make a nested structure of from_tensor_slice to pass two arguments in Dataset.map with a tf.py_func wrapper
我试图通过使用 Dataset.map()
来创建我的输入管道,从而将 .h5 解析器函数映射到 py_func
包装器。我想在 map 函数中传递两个参数:filename
和 window_size
。以下代码有调用顺序:Dataset.map
--> _pyfn_wrapper
--> parse_h5
缺点是使用 map() 函数 _pyfn_wrapper 只能接受一个参数,因为 from_tensor_slices
不能压缩 2 种类型的数据:字符串然后是 int
def helper(window_size, batch_size, ncores=mp.cpu_count()):
flist = []
for dirpath, _, fnames in os.walk('./'):
for fname in fnames:
flist.append(os.path.abspath(os.path.join(dirpath, fname)))
f_len = len(flist)
# init list of files
batch = tf.data.Dataset.from_tensor_slices((tf.constant(flist))) #fixme: how to zip one list of string and a list of int
batch = batch.map_fn(_pyfn_wrapper, num_parallel_calls=ncores) #fixme: how to map two args
batch = batch.shuffle(batch_size).batch(batch_size, drop_remainder=True).prefetch(ncores + 6)
# construct iterator
it = batch.make_initializable_iterator()
iter_init_op = it.initializer
# get next img and label
X_it, y_it = it.get_next()
inputs = {'img': X_it, 'label': y_it, 'iterator_init_op': iter_init_op}
return inputs, f_len
def _pyfn_wrapper(filename): #fixme: args
# filename, window_size = args #fixme: try to separate args
window_size = 100
return tf.py_func(parse_h5, #wrapped pythonic function
[filename, window_size],
[tf.float32, tf.float32] #[input, output] dtype
)
def parse_h5(name, window_size):
with h5py.File(name.decode('utf-8'), 'r') as f:
X = f['X'][:].reshape(window_size, window_size, 1)
y = f['y'][:].reshape(window_size, window_size, 1)
return X, y
# create tf.data.Dataset
helper, f_len = helper(100, 5, True)
# inject into model
with tf.name_scope("Conv1"):
W = tf.get_variable("W", shape=[3, 3, 1, 1],
initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable("b", shape=[1], initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.conv2d(helper['img'], W, strides=[1, 1, 1, 1], padding='SAME') + b
logits = tf.nn.relu(layer1)
loss = tf.reduce_mean(tf.losses.mean_squared_error(labels=helper['label'], predictions=logits))
train_op = tf.train.AdamOptimizer(0.0001).minimize(loss)
# session
with tf.Session() as sess:
sess.run(helper['iterator_init_op'])
sess.run(tf.global_variables_initializer())
for step in range(f_len):
sess.run([train_op])
可以先运行下面的代码片段来先创建随机数据
import multiprocessing as mp
def write_h5(x):
with h5py.File('./{}.h5'.format(x), 'w') as f:
print(mp.current_process()) # see process ID
x = y = np.arange(-1, 1, 0.02)
xx, _ = np.meshgrid(x, y)
a = xx ** 2
b = np.add(a, np.random.randn(100, 100)) #do something and add gaussian noise
f.create_dataset('X', shape=(100, 100), dtype='float32', data=a)
f.create_dataset('y', shape=(100, 100), dtype='float32', data=b)
# init data
p = mp.Pool(mp.cpu_count())
p.map(write_h5, range(100))
使用Datasets
的嵌套结构作为@Sharky 的注释是解决方案之一。应该 unzip 这个嵌套的参数 在最后 parse_h5
函数而不是 _pyfn_wrapper
以避免错误:
TypeError: Tensor objects are only iterable when eager execution is
enabled. To iterate over this tensor use tf.map_fn.
还应该解码参数,因为通过 tf.py_func() 参数被转换为二进制文字。
代码修改:
def helper(...):
...
flist.append((os.path.abspath(os.path.join(dirpath, fname)), str(window_size)))
...
def _pyfn_wrapper(args):
return tf.py_func(parse_h5, #wrapped pythonic function
[args],
[tf.float32, tf.float32] #output dtype
)
def parse_h5(args):
name, window_size = args #only unzip the args here
window_size = int(window_size.decode('utf-8')) #and decode for converting bin to int
with h5py.File(name, 'r') as f:
...
我试图通过使用 Dataset.map()
来创建我的输入管道,从而将 .h5 解析器函数映射到 py_func
包装器。我想在 map 函数中传递两个参数:filename
和 window_size
。以下代码有调用顺序:Dataset.map
--> _pyfn_wrapper
--> parse_h5
缺点是使用 map() 函数 _pyfn_wrapper 只能接受一个参数,因为 from_tensor_slices
不能压缩 2 种类型的数据:字符串然后是 int
def helper(window_size, batch_size, ncores=mp.cpu_count()):
flist = []
for dirpath, _, fnames in os.walk('./'):
for fname in fnames:
flist.append(os.path.abspath(os.path.join(dirpath, fname)))
f_len = len(flist)
# init list of files
batch = tf.data.Dataset.from_tensor_slices((tf.constant(flist))) #fixme: how to zip one list of string and a list of int
batch = batch.map_fn(_pyfn_wrapper, num_parallel_calls=ncores) #fixme: how to map two args
batch = batch.shuffle(batch_size).batch(batch_size, drop_remainder=True).prefetch(ncores + 6)
# construct iterator
it = batch.make_initializable_iterator()
iter_init_op = it.initializer
# get next img and label
X_it, y_it = it.get_next()
inputs = {'img': X_it, 'label': y_it, 'iterator_init_op': iter_init_op}
return inputs, f_len
def _pyfn_wrapper(filename): #fixme: args
# filename, window_size = args #fixme: try to separate args
window_size = 100
return tf.py_func(parse_h5, #wrapped pythonic function
[filename, window_size],
[tf.float32, tf.float32] #[input, output] dtype
)
def parse_h5(name, window_size):
with h5py.File(name.decode('utf-8'), 'r') as f:
X = f['X'][:].reshape(window_size, window_size, 1)
y = f['y'][:].reshape(window_size, window_size, 1)
return X, y
# create tf.data.Dataset
helper, f_len = helper(100, 5, True)
# inject into model
with tf.name_scope("Conv1"):
W = tf.get_variable("W", shape=[3, 3, 1, 1],
initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable("b", shape=[1], initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.conv2d(helper['img'], W, strides=[1, 1, 1, 1], padding='SAME') + b
logits = tf.nn.relu(layer1)
loss = tf.reduce_mean(tf.losses.mean_squared_error(labels=helper['label'], predictions=logits))
train_op = tf.train.AdamOptimizer(0.0001).minimize(loss)
# session
with tf.Session() as sess:
sess.run(helper['iterator_init_op'])
sess.run(tf.global_variables_initializer())
for step in range(f_len):
sess.run([train_op])
可以先运行下面的代码片段来先创建随机数据
import multiprocessing as mp
def write_h5(x):
with h5py.File('./{}.h5'.format(x), 'w') as f:
print(mp.current_process()) # see process ID
x = y = np.arange(-1, 1, 0.02)
xx, _ = np.meshgrid(x, y)
a = xx ** 2
b = np.add(a, np.random.randn(100, 100)) #do something and add gaussian noise
f.create_dataset('X', shape=(100, 100), dtype='float32', data=a)
f.create_dataset('y', shape=(100, 100), dtype='float32', data=b)
# init data
p = mp.Pool(mp.cpu_count())
p.map(write_h5, range(100))
使用Datasets
的嵌套结构作为@Sharky 的注释是解决方案之一。应该 unzip 这个嵌套的参数 在最后 parse_h5
函数而不是 _pyfn_wrapper
以避免错误:
TypeError: Tensor objects are only iterable when eager execution is enabled. To iterate over this tensor use tf.map_fn.
还应该解码参数,因为通过 tf.py_func() 参数被转换为二进制文字。
代码修改:
def helper(...):
...
flist.append((os.path.abspath(os.path.join(dirpath, fname)), str(window_size)))
...
def _pyfn_wrapper(args):
return tf.py_func(parse_h5, #wrapped pythonic function
[args],
[tf.float32, tf.float32] #output dtype
)
def parse_h5(args):
name, window_size = args #only unzip the args here
window_size = int(window_size.decode('utf-8')) #and decode for converting bin to int
with h5py.File(name, 'r') as f:
...