如何在张量流模型的 tf.data 管道中提供 .h5 文件
How to feed .h5 files in tf.data pipeline in tensorflow model
我正在尝试使用 tf.data 优化 .h5 数据的输入管道。但是遇到了一个TypeError: expected str, bytes or os.PathLike object, not Tensor
。我做了一项研究,但找不到任何有关将字符串张量转换为字符串的信息。
此简化代码可执行且return同样的错误:
batch_size = 1000
conv_size = 3
nb_conv = 32
learning_rate = 0.0001
# define parser function
def parse_function(fname):
with h5py.File(fname, 'r') as f: #Error comes from here
X = f['X'].reshape(batch_size, patch_size, patch_size, 1)
y = f['y'].reshape(batch_size, patch_size, patch_size, 1)
return X, y
# create a list of files path
flist = []
for dirpath, _, fnames in os.walk('./proc/'):
for fname in fnames:
if fname.startswith('{}_{}'.format(patch_size, batch_size)) and fname.endswith('h5'):
flist.append(fname)
# prefetch data
dataset = tf.data.Dataset.from_tensor_slices((flist))
dataset = dataset.shuffle(len(flist))
dataset = dataset.map(parse_function, num_parallel_calls=4)
dataset = dataset.batch(1)
dataset = dataset.prefetch(3)
# simplest model that I think of
X_ph = tf.placeholder(tf.float32, shape=None)
y_ph = tf.placeholder(tf.float32, shape=None)
W = tf.get_variable('w', shape=[conv_size, conv_size, 1, 1], initializer=tf.contrib.layers.xavier_initializer())
loss = tf.reduce_mean(tf.losses.mean_squared_error(tf.nn.softmax(labels=y_ph, predictions=tf.matmul(X_ph, W))))
train_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
# start session
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
print(sess.run(train_op, feed_dict={X_ph: dataset[0], y_ph: dataset[1]}))
显然 fname
是一个字符串张量,但位置参数只等待一个字符串。我找不到关于此的任何文档。而 的答案并没有解决这个问题。就我而言,我只使用 h5,其中一个 h5 存储一批。
更新解决方案:
感谢@kvish 的评论,解决了加载.h5 文件的部分。
代码升级为一个简单的转换层,占位符已被占用。 每个 .h5 是一个批次。 我想并行预取多个批次(h5py 不支持多线程读取,所以我将批次写入多个文件)。可以 复制粘贴并启动:
import h5py
import threading
import numpy as np
import tensorflow as tf
# generate some img data
for i in range(5):
with h5py.File('./test_{}.h5'.format(i), 'w') as f:
f.create_dataset('X', shape=(1000, 100, 100), dtype='float32', data=np.random.rand(10**7).reshape(1000, 100, 100))
f.create_dataset('y', shape=(1000, 100, 100), dtype='float32', data=np.random.rand(10**7).reshape(1000, 100, 100))
print(threading.get_ident())
# params
num_cores = 3
shuffle_size = 1
batch_size = 1
# read .h5 file
def parse_file(f):
print(f.decode('utf-8'))
with h5py.File(f.decode("utf-8"), 'r') as fi:
X = fi['X'][:].reshape(1000, 100, 100, 1)
y = fi['y'][:].reshape(1000, 100, 100, 1)
print(threading.get_ident()) # to see the thread id
return X, y
# py_func wrapper
def parse_file_tf(filename):
return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
# tf.data input pipeline
files = tf.data.Dataset.list_files('./test_*.h5')
dataset = files.map(parse_file_tf, num_parallel_calls=num_core)
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(3)
it = dataset.make_initializable_iterator()
iter_init_op = it.initializer
X_it, y_it = it.get_next()
# simplest model that I can think of
with tf.name_scope("Conv1"):
W = tf.get_variable("W", shape=[3, 3, 1, 1],
initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable("b", shape=[1], initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.conv2d(X_it, W, strides=[1, 1, 1, 1], padding='SAME') + b
out = tf.nn.relu(layer1)
loss = tf.reduce_mean(tf.losses.mean_squared_error(labels=y_it, predictions=out))
train_op = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(loss)
# session
sess = tf.Session()
sess.run(tf.global_variables_initializer())
sess.run(iter_init_op)
sess.run([train_op])
sess.close()
不知何故会有另一个与此无关的 cudnn 问题 post。
tensorflow-cpu v1.12:工作正常
tensorflow-gpu v1.12:运行时 问题发生
Traceback (most recent call last): File
"/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py",
line 1334, in _do_call
return fn(*args) File "/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py",
line 1319, in _run_fn
options, feed_dict, fetch_list, target_list, run_metadata) File "/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py",
line 1407, in _call_tf_sessionrun
run_metadata) tensorflow.python.framework.errors_impl.NotFoundError: No algorithm
worked! [[{{node Conv1/Conv2D}} = Conv2D[T=DT_FLOAT,
data_format="NCHW", dilations=[1, 1, 1, 1], padding="SAME",
strides=[1, 1, 1, 1], use_cudnn_on_gpu=true,
_device="/job:localhost/replica:0/task:0/device:GPU:0"](gradients/Conv1/Conv2D_grad/Conv2DBackpropFilter-0-TransposeNHWCToNCHW-LayoutOptimizer,
W/read)]] [[{{node
mean_squared_error/num_present/broadcast_weights/assert_broadcastable/AssertGuard/Assert/Switch_2/_37}}
= _Recvclient_terminated=false, recv_device="/job:localhost/replica:0/task:0/device:CPU:0",
send_device="/job:localhost/replica:0/task:0/device:GPU:0",
send_device_incarnation=1, tensor_name="edge_63_me...t/Switch_2",
tensor_type=DT_INT32,
_device="/job:localhost/replica:0/task:0/device:CPU:0"]]
tensorflow-cpu v1.12: works fine!
这是一个示例,说明如何借助 py_func 包装函数。请注意,这在 TF V2 中已弃用。您可以按照文档了解更多详细信息。
def parse_function_wrapper(filename):
# Assuming your data and labels are float32
# Your input is parse_function, who arg is filename, and you get X and y as output
# whose datatypes are indicated by the tuple argument
features, labels = tf.py_func(
parse_function, [filename], (tf.float32, tf.float32))
return features, labels
# Create dataset of filenames.
dataset = tf.data.Dataset.from_tensor_slices(flist)
dataset = dataset.shuffle(len(flist))
dataset = dataset.map(parse_function_wrapper)
我正在尝试使用 tf.data 优化 .h5 数据的输入管道。但是遇到了一个TypeError: expected str, bytes or os.PathLike object, not Tensor
。我做了一项研究,但找不到任何有关将字符串张量转换为字符串的信息。
此简化代码可执行且return同样的错误:
batch_size = 1000
conv_size = 3
nb_conv = 32
learning_rate = 0.0001
# define parser function
def parse_function(fname):
with h5py.File(fname, 'r') as f: #Error comes from here
X = f['X'].reshape(batch_size, patch_size, patch_size, 1)
y = f['y'].reshape(batch_size, patch_size, patch_size, 1)
return X, y
# create a list of files path
flist = []
for dirpath, _, fnames in os.walk('./proc/'):
for fname in fnames:
if fname.startswith('{}_{}'.format(patch_size, batch_size)) and fname.endswith('h5'):
flist.append(fname)
# prefetch data
dataset = tf.data.Dataset.from_tensor_slices((flist))
dataset = dataset.shuffle(len(flist))
dataset = dataset.map(parse_function, num_parallel_calls=4)
dataset = dataset.batch(1)
dataset = dataset.prefetch(3)
# simplest model that I think of
X_ph = tf.placeholder(tf.float32, shape=None)
y_ph = tf.placeholder(tf.float32, shape=None)
W = tf.get_variable('w', shape=[conv_size, conv_size, 1, 1], initializer=tf.contrib.layers.xavier_initializer())
loss = tf.reduce_mean(tf.losses.mean_squared_error(tf.nn.softmax(labels=y_ph, predictions=tf.matmul(X_ph, W))))
train_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
# start session
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
print(sess.run(train_op, feed_dict={X_ph: dataset[0], y_ph: dataset[1]}))
显然 fname
是一个字符串张量,但位置参数只等待一个字符串。我找不到关于此的任何文档。而
更新解决方案: 感谢@kvish 的评论,解决了加载.h5 文件的部分。 代码升级为一个简单的转换层,占位符已被占用。 每个 .h5 是一个批次。 我想并行预取多个批次(h5py 不支持多线程读取,所以我将批次写入多个文件)。可以 复制粘贴并启动:
import h5py
import threading
import numpy as np
import tensorflow as tf
# generate some img data
for i in range(5):
with h5py.File('./test_{}.h5'.format(i), 'w') as f:
f.create_dataset('X', shape=(1000, 100, 100), dtype='float32', data=np.random.rand(10**7).reshape(1000, 100, 100))
f.create_dataset('y', shape=(1000, 100, 100), dtype='float32', data=np.random.rand(10**7).reshape(1000, 100, 100))
print(threading.get_ident())
# params
num_cores = 3
shuffle_size = 1
batch_size = 1
# read .h5 file
def parse_file(f):
print(f.decode('utf-8'))
with h5py.File(f.decode("utf-8"), 'r') as fi:
X = fi['X'][:].reshape(1000, 100, 100, 1)
y = fi['y'][:].reshape(1000, 100, 100, 1)
print(threading.get_ident()) # to see the thread id
return X, y
# py_func wrapper
def parse_file_tf(filename):
return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
# tf.data input pipeline
files = tf.data.Dataset.list_files('./test_*.h5')
dataset = files.map(parse_file_tf, num_parallel_calls=num_core)
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(3)
it = dataset.make_initializable_iterator()
iter_init_op = it.initializer
X_it, y_it = it.get_next()
# simplest model that I can think of
with tf.name_scope("Conv1"):
W = tf.get_variable("W", shape=[3, 3, 1, 1],
initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable("b", shape=[1], initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.conv2d(X_it, W, strides=[1, 1, 1, 1], padding='SAME') + b
out = tf.nn.relu(layer1)
loss = tf.reduce_mean(tf.losses.mean_squared_error(labels=y_it, predictions=out))
train_op = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(loss)
# session
sess = tf.Session()
sess.run(tf.global_variables_initializer())
sess.run(iter_init_op)
sess.run([train_op])
sess.close()
不知何故会有另一个与此无关的 cudnn 问题 post。
tensorflow-cpu v1.12:工作正常
tensorflow-gpu v1.12:运行时 问题发生
Traceback (most recent call last): File "/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1334, in _do_call return fn(*args) File "/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1319, in _run_fn options, feed_dict, fetch_list, target_list, run_metadata) File "/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1407, in _call_tf_sessionrun run_metadata) tensorflow.python.framework.errors_impl.NotFoundError: No algorithm worked! [[{{node Conv1/Conv2D}} = Conv2D[T=DT_FLOAT, data_format="NCHW", dilations=[1, 1, 1, 1], padding="SAME", strides=[1, 1, 1, 1], use_cudnn_on_gpu=true, _device="/job:localhost/replica:0/task:0/device:GPU:0"](gradients/Conv1/Conv2D_grad/Conv2DBackpropFilter-0-TransposeNHWCToNCHW-LayoutOptimizer, W/read)]] [[{{node mean_squared_error/num_present/broadcast_weights/assert_broadcastable/AssertGuard/Assert/Switch_2/_37}} = _Recvclient_terminated=false, recv_device="/job:localhost/replica:0/task:0/device:CPU:0", send_device="/job:localhost/replica:0/task:0/device:GPU:0", send_device_incarnation=1, tensor_name="edge_63_me...t/Switch_2", tensor_type=DT_INT32, _device="/job:localhost/replica:0/task:0/device:CPU:0"]] tensorflow-cpu v1.12: works fine!
这是一个示例,说明如何借助 py_func 包装函数。请注意,这在 TF V2 中已弃用。您可以按照文档了解更多详细信息。
def parse_function_wrapper(filename):
# Assuming your data and labels are float32
# Your input is parse_function, who arg is filename, and you get X and y as output
# whose datatypes are indicated by the tuple argument
features, labels = tf.py_func(
parse_function, [filename], (tf.float32, tf.float32))
return features, labels
# Create dataset of filenames.
dataset = tf.data.Dataset.from_tensor_slices(flist)
dataset = dataset.shuffle(len(flist))
dataset = dataset.map(parse_function_wrapper)