使用队列从多个输入文件中统一采样
Using Queues to uniformly sample from multiple input files
我的数据集中每个 class 都有一个序列化文件。我想使用队列来加载这些文件中的每一个,然后将它们放在一个 RandomShuffleQueue 中,这样我就可以从每个 class 中随机混合示例。我认为这段代码可以工作。
在此示例中,每个文件有 10 个示例。
filenames = ["a", "b", ...]
with self.test_session() as sess:
# for each file open a queue and get that
# queue's results.
strings = []
rq = tf.RandomShuffleQueue(1000, 10, [tf.string], shapes=())
for filename in filenames:
q = tf.FIFOQueue(99, [tf.string], shapes=())
q.enqueue([filename]).run()
q.close().run()
# read_string just pulls a string from the file
key, out_string = input_data.read_string(q, IMAGE_SIZE, CHANNELS, LABEL_BYTES)
strings.append(out_string)
rq.enqueue([out_string]).run()
rq.close().run()
qs = rq.dequeue()
label, image = input_data.string_to_data(qs, IMAGE_SIZE, CHANNELS, LABEL_BYTES)
for i in range(11):
l, im = sess.run([label, image])
print("L: {}".format(l)
这对 10 个调用来说工作正常,但在第 11 个调用时它说队列是空的。
我认为这是因为我对这些队列的操作有误解。我向 RandomShuffleQueue
添加了 10 个变量,但这些变量中的每一个本身都是从队列中提取的,所以我假设在每个文件队列为空之前队列不会被清空。
我做错了什么?
此问题的正确答案取决于您拥有的文件数量、文件大小以及文件大小的分布方式。
您的示例的直接问题是 rq
只为每个 filename in filenames
获取一个元素,然后队列关闭。我假设有 10 个 filenames
,因为每次调用 sess.run([label, image])
时 rq.dequeue()
将消耗 rq
的一个元素。由于队列关闭,无法再添加元素,第11次激活rq.dequeue()
操作失败
一般的解决方案是你必须创建额外的线程来保持 运行 rq.enqueue([out_string])
在一个循环中。 TensorFlow 包含一个旨在简化此操作的 QueueRunner
class,以及其他一些处理常见情况的函数。 documentation for threading and queues explains how they are used, and there is also some good information on using queues to read from files.
至于您的特定问题,处理此问题的一种方法是创建 N
个读取器(针对每个 N
个文件)。然后你可以 RandomShuffleQueue
上的 tf.pack()
N
elements (one from each reader) into a batch, and use enqueue_many
to add a batch at a time into a tf.RandomShuffleQueue
with a sufficiently large capacity and min_after_dequeue
to ensure that there is sufficient mixing between the classes. Calling dequeue_many(k)
会给你一批 k
从每个文件中以相同概率采样的元素。
我的数据集中每个 class 都有一个序列化文件。我想使用队列来加载这些文件中的每一个,然后将它们放在一个 RandomShuffleQueue 中,这样我就可以从每个 class 中随机混合示例。我认为这段代码可以工作。
在此示例中,每个文件有 10 个示例。
filenames = ["a", "b", ...]
with self.test_session() as sess:
# for each file open a queue and get that
# queue's results.
strings = []
rq = tf.RandomShuffleQueue(1000, 10, [tf.string], shapes=())
for filename in filenames:
q = tf.FIFOQueue(99, [tf.string], shapes=())
q.enqueue([filename]).run()
q.close().run()
# read_string just pulls a string from the file
key, out_string = input_data.read_string(q, IMAGE_SIZE, CHANNELS, LABEL_BYTES)
strings.append(out_string)
rq.enqueue([out_string]).run()
rq.close().run()
qs = rq.dequeue()
label, image = input_data.string_to_data(qs, IMAGE_SIZE, CHANNELS, LABEL_BYTES)
for i in range(11):
l, im = sess.run([label, image])
print("L: {}".format(l)
这对 10 个调用来说工作正常,但在第 11 个调用时它说队列是空的。
我认为这是因为我对这些队列的操作有误解。我向 RandomShuffleQueue
添加了 10 个变量,但这些变量中的每一个本身都是从队列中提取的,所以我假设在每个文件队列为空之前队列不会被清空。
我做错了什么?
此问题的正确答案取决于您拥有的文件数量、文件大小以及文件大小的分布方式。
您的示例的直接问题是 rq
只为每个 filename in filenames
获取一个元素,然后队列关闭。我假设有 10 个 filenames
,因为每次调用 sess.run([label, image])
时 rq.dequeue()
将消耗 rq
的一个元素。由于队列关闭,无法再添加元素,第11次激活rq.dequeue()
操作失败
一般的解决方案是你必须创建额外的线程来保持 运行 rq.enqueue([out_string])
在一个循环中。 TensorFlow 包含一个旨在简化此操作的 QueueRunner
class,以及其他一些处理常见情况的函数。 documentation for threading and queues explains how they are used, and there is also some good information on using queues to read from files.
至于您的特定问题,处理此问题的一种方法是创建 N
个读取器(针对每个 N
个文件)。然后你可以 RandomShuffleQueue
上的 tf.pack()
N
elements (one from each reader) into a batch, and use enqueue_many
to add a batch at a time into a tf.RandomShuffleQueue
with a sufficiently large capacity and min_after_dequeue
to ensure that there is sufficient mixing between the classes. Calling dequeue_many(k)
会给你一批 k
从每个文件中以相同概率采样的元素。