为什么 Tensorflow tf.FIFOQueue 在以下代码中提前关闭?
Why does the Tensorflow tf.FIFOQueue close early in the following code?
我正在尝试实现一个队列,该队列在后台具有 enqueue
运行ning,在主线程中具有 dequeue
运行ning。
目标是 运行 循环中的优化器取决于存储在缓冲区中的值,并且仅随着优化中的每个步骤而变化。下面用一个简单的例子来说明:
VarType = tf.int32
data0 = np.array([1.0])
init = tf.placeholder(VarType, [1])
q = tf.FIFOQueue(capacity=1, shapes=[1], dtypes=VarType)
nq_init = q.enqueue(init)
# I use a Variable intermediary because I will want to access the
# data multiple times, but I do not want the next data point in the
# queue until I initialize the variable again.
data_ = tf.Variable(q.dequeue(), trainable=False, collections=[])
# Notice that data_ is accessed twice, but should be the same
# in a single sess.run
# so "data_ = q.dequeue()" would not be correct
# plus there needs to be access to initial data
data1 = data_ + 1
data2 = data_ * data1
qr = tf.train.QueueRunner(q, [q.enqueue(data2)] * 1)
tf.train.add_queue_runner(qr)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
sess.run(nq_init, feed_dict={init:data0})
# this first initialization works fine
sess.run(data_.initializer)
for n in range(10):
print(sess.run(data2))
# this second initialization errors out:
sess.run(data_.initializer)
coord.request_stop()
coord.join(threads)
print('Done')
这段代码出错,错误如下:
"OutOfRangeError (see above for traceback): FIFOQueue '_0_fifo_queue' is closed and has insufficient elements (requested 1, current size 0)"
为什么,这是如何修复的?
所以我找到了 "how to fix it" 部分,但没有找到原因。
似乎第一个 enqueue/dequeue 必须是 运行,然后第二个 enqueue/dequeue 才被放入 QUEUE_RUNNERS 集合 - 但需要注意的是,我们需要 运行 sess.run(data_.initializer)
两次:
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
sess.run(nq_init, feed_dict={init:data0})
sess.run(data_.initializer)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for n in range(10):
print(sess.run(data2))
sess.run(data_.initializer)
sess.run(data_.initializer)
coord.request_stop()
coord.join(threads)
输出符合预期:
[2]; [6]; [42];...
没有这两个电话,我得到以下信息:
[2]; [6]; [6]; [42];...
我怀疑 q.enqueue
有自己的缓冲区来保存旧的 data2
,所以它必须被调用两次。这也符合第一个不重复的值,因为那时第二个 q.enqueue
仍然是空的。不确定如何克服这一怪癖。
我正在尝试实现一个队列,该队列在后台具有 enqueue
运行ning,在主线程中具有 dequeue
运行ning。
目标是 运行 循环中的优化器取决于存储在缓冲区中的值,并且仅随着优化中的每个步骤而变化。下面用一个简单的例子来说明:
VarType = tf.int32
data0 = np.array([1.0])
init = tf.placeholder(VarType, [1])
q = tf.FIFOQueue(capacity=1, shapes=[1], dtypes=VarType)
nq_init = q.enqueue(init)
# I use a Variable intermediary because I will want to access the
# data multiple times, but I do not want the next data point in the
# queue until I initialize the variable again.
data_ = tf.Variable(q.dequeue(), trainable=False, collections=[])
# Notice that data_ is accessed twice, but should be the same
# in a single sess.run
# so "data_ = q.dequeue()" would not be correct
# plus there needs to be access to initial data
data1 = data_ + 1
data2 = data_ * data1
qr = tf.train.QueueRunner(q, [q.enqueue(data2)] * 1)
tf.train.add_queue_runner(qr)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
sess.run(nq_init, feed_dict={init:data0})
# this first initialization works fine
sess.run(data_.initializer)
for n in range(10):
print(sess.run(data2))
# this second initialization errors out:
sess.run(data_.initializer)
coord.request_stop()
coord.join(threads)
print('Done')
这段代码出错,错误如下:
"OutOfRangeError (see above for traceback): FIFOQueue '_0_fifo_queue' is closed and has insufficient elements (requested 1, current size 0)"
为什么,这是如何修复的?
所以我找到了 "how to fix it" 部分,但没有找到原因。
似乎第一个 enqueue/dequeue 必须是 运行,然后第二个 enqueue/dequeue 才被放入 QUEUE_RUNNERS 集合 - 但需要注意的是,我们需要 运行 sess.run(data_.initializer)
两次:
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
sess.run(nq_init, feed_dict={init:data0})
sess.run(data_.initializer)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for n in range(10):
print(sess.run(data2))
sess.run(data_.initializer)
sess.run(data_.initializer)
coord.request_stop()
coord.join(threads)
输出符合预期:
[2]; [6]; [42];...
没有这两个电话,我得到以下信息:
[2]; [6]; [6]; [42];...
我怀疑 q.enqueue
有自己的缓冲区来保存旧的 data2
,所以它必须被调用两次。这也符合第一个不重复的值,因为那时第二个 q.enqueue
仍然是空的。不确定如何克服这一怪癖。