了解 random_shuffle_queue 何时用完元素并关闭它
Understanding when random_shuffle_queue runs out of elements and close it
我有 1000
个大小为 32x32x3
的图像存储在 dummy.tfrecord
文件中。我想遍历数据集两次(2 个时期),所以我指定 tf.train.string_input_producer([dummy.tfrecord], num_epochs=2)
。对于批量大小 100
,我希望 tf.train.shuffle_batch
到 运行 2 * 10 = 20
次迭代,因为它需要 10
批次 100
耗尽 1000
图片。
我遵循了 ,它确实按预期产生了 20
次迭代。然而,最后,我收到了错误:
RandomShuffleQueue '_1_shuffle_batch/random_shuffle_queue' is closed and has insufficient elements (requested 100, current size 0)
这是有道理的,因为队列中还有 0
张图片。
如何关闭队列并干净地退出?也就是应该不会出错。
完整脚本如下:
import numpy as np
import tensorflow as tf
NUM_IMGS = 1000
tfrecord_file = 'dummy.tfrecord'
def read_from_tfrecord(filenames):
tfrecord_file_queue = tf.train.string_input_producer(filenames,
num_epochs=2)
reader = tf.TFRecordReader()
_, tfrecord_serialized = reader.read(tfrecord_file_queue)
tfrecord_features = tf.parse_single_example(tfrecord_serialized,
features={
'label': tf.FixedLenFeature([], tf.string),
'image': tf.FixedLenFeature([], tf.string),
}, name='features')
image = tf.decode_raw(tfrecord_features['image'], tf.uint8)
image = tf.reshape(image, shape=(32, 32, 3))
label = tf.cast(tfrecord_features['label'], tf.string)
#provide batches
images, labels = tf.train.shuffle_batch([image, label],
batch_size=100,
num_threads=4,
capacity=50,
min_after_dequeue=1)
return images, labels
imgs, lbls = read_from_tfrecord([tfrecord_file])
init_op = tf.group(tf.global_variables_initializer(),
tf.local_variables_initializer())
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
while not coord.should_stop():
labels, images = sess.run([lbls, imgs])
print(images.shape) #PRINTED 20 TIMES BUT FAILED AT THE 21ST
coord.request_stop()
coord.join(threads)
这里是生成dummy.tfrecord
文件的脚本,如果有人想复制:
def generate_image_binary():
images = np.random.randint(0,255, size=(NUM_POINTS, 32, 32, 3),
dtype=np.uint8)
labels = np.random.randint(0,2, size=(NUM_POINTS, 1))
return labels, images
def write_to_tfrecord(labels, images, tfrecord_file):
writer = tf.python_io.TFRecordWriter(tfrecord_file)
for i in range(NUM_POINTS):
example = tf.train.Example(features=tf.train.Features(feature={
'label':
tf.train.Feature(bytes_list=tf.train.BytesList(value=[labels[i].tobytes()])),
'image':
tf.train.Feature(bytes_list=tf.train.BytesList(value=[images[i].tobytes()]))
}))
writer.write(example.SerializeToString())
writer.close()
tfrecord_file = 'dummy.tfrecord'
labels, images= generate_image_binary()
write_to_tfrecord(labels, images, tfrecord_file)
Coordinator
可以捕获和处理类似tf.errors.OutOfRangeError
的异常,用于报告队列已关闭。
您可以更改代码以处理上述异常:
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
labels, images = sess.run([lbls, imgs])
print(images.shape) #PRINTED 20 TIMES BUT FAILED AT THE 21ST
except Exception, e:
# When done, ask the threads to stop.
coord.request_stop(e)
finally:
coord.request_stop()
# Wait for threads to finish.
coord.join(threads)
我有 1000
个大小为 32x32x3
的图像存储在 dummy.tfrecord
文件中。我想遍历数据集两次(2 个时期),所以我指定 tf.train.string_input_producer([dummy.tfrecord], num_epochs=2)
。对于批量大小 100
,我希望 tf.train.shuffle_batch
到 运行 2 * 10 = 20
次迭代,因为它需要 10
批次 100
耗尽 1000
图片。
我遵循了 20
次迭代。然而,最后,我收到了错误:
RandomShuffleQueue '_1_shuffle_batch/random_shuffle_queue' is closed and has insufficient elements (requested 100, current size 0)
这是有道理的,因为队列中还有 0
张图片。
如何关闭队列并干净地退出?也就是应该不会出错。
完整脚本如下:
import numpy as np
import tensorflow as tf
NUM_IMGS = 1000
tfrecord_file = 'dummy.tfrecord'
def read_from_tfrecord(filenames):
tfrecord_file_queue = tf.train.string_input_producer(filenames,
num_epochs=2)
reader = tf.TFRecordReader()
_, tfrecord_serialized = reader.read(tfrecord_file_queue)
tfrecord_features = tf.parse_single_example(tfrecord_serialized,
features={
'label': tf.FixedLenFeature([], tf.string),
'image': tf.FixedLenFeature([], tf.string),
}, name='features')
image = tf.decode_raw(tfrecord_features['image'], tf.uint8)
image = tf.reshape(image, shape=(32, 32, 3))
label = tf.cast(tfrecord_features['label'], tf.string)
#provide batches
images, labels = tf.train.shuffle_batch([image, label],
batch_size=100,
num_threads=4,
capacity=50,
min_after_dequeue=1)
return images, labels
imgs, lbls = read_from_tfrecord([tfrecord_file])
init_op = tf.group(tf.global_variables_initializer(),
tf.local_variables_initializer())
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
while not coord.should_stop():
labels, images = sess.run([lbls, imgs])
print(images.shape) #PRINTED 20 TIMES BUT FAILED AT THE 21ST
coord.request_stop()
coord.join(threads)
这里是生成dummy.tfrecord
文件的脚本,如果有人想复制:
def generate_image_binary():
images = np.random.randint(0,255, size=(NUM_POINTS, 32, 32, 3),
dtype=np.uint8)
labels = np.random.randint(0,2, size=(NUM_POINTS, 1))
return labels, images
def write_to_tfrecord(labels, images, tfrecord_file):
writer = tf.python_io.TFRecordWriter(tfrecord_file)
for i in range(NUM_POINTS):
example = tf.train.Example(features=tf.train.Features(feature={
'label':
tf.train.Feature(bytes_list=tf.train.BytesList(value=[labels[i].tobytes()])),
'image':
tf.train.Feature(bytes_list=tf.train.BytesList(value=[images[i].tobytes()]))
}))
writer.write(example.SerializeToString())
writer.close()
tfrecord_file = 'dummy.tfrecord'
labels, images= generate_image_binary()
write_to_tfrecord(labels, images, tfrecord_file)
Coordinator
可以捕获和处理类似tf.errors.OutOfRangeError
的异常,用于报告队列已关闭。
您可以更改代码以处理上述异常:
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
labels, images = sess.run([lbls, imgs])
print(images.shape) #PRINTED 20 TIMES BUT FAILED AT THE 21ST
except Exception, e:
# When done, ask the threads to stop.
coord.request_stop(e)
finally:
coord.request_stop()
# Wait for threads to finish.
coord.join(threads)