read_cifar10() 如何例程 return 除了 TensorFlow 教程中的第一个对象之外的任何东西?

How can read_cifar10() routine return anything other than first object in TensorFlow tutorial?

TensorFlow 有 CIFAR-10 教程,is discussed here. Source code in Python is here

它有read_cifar10() routine here,用于从二进制文件中读取样本。

我不明白它是如何工作的。怀疑这在某种程度上与 TensorFlow 延迟性质有关,但无法弄清楚如何。

在某些时候例程会执行以下操作:

# Read a record, getting filenames from the filename_queue.  No
  # header or footer in the CIFAR-10 format, so we leave header_bytes
  # and footer_bytes at their default of 0.
  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  result.key, value = reader.read(filename_queue)

我在这里看到,一个新的 reader 从头开始​​创建,然后这个 reader 指向文件名队列。

read 调用返回了多少样本?

稍后,在 distorted_inputs() 方法中,代码执行以下操作:

print ('Filling queue with %d CIFAR images before starting to train. '
         'This will take a few minutes.' % min_queue_examples)

  # Generate a batch of images and labels by building up a queue of examples.
  return _generate_image_and_label_batch(float_image, read_input.label,
                                         min_queue_examples)

这里 print 是正常的 Python 调用,不是延迟的,所以注释假定将立即获取 20000 条记录。

怎么会这样?我到处都只看到每条记录的逻辑。它如何乘以许多记录?

TLDR; reader.read 仅向计算图添加 read 操作,实际执行发生在 session.run 期间,由 while(True): session.run(...) 类型的循环中的单独线程完成 start_queue_runners =]

长版: 这是 "input pipeline" 的一部分,由于 reading/prefetching 需要异步发生以避免阻塞这一事实而变得复杂。描述输入管道的官方 how-to 是 here

更具体地说,reader.read在计算图中添加了读取单个记录的操作。然后,此操作输入 _generate_image_and_label_batch 中创建的 shuffle_batch。到目前为止,还没有进行任何阅读。 shuffle_batch 操作创建了一个队列,它解耦了输入流,在某种意义上,可以使用不同的 session.run 调用异步完成对队列之前和队列之后的图形部分的评估,队列提供中间缓冲。此外,shuffle_batch 操作将输入队列的操作注册为 GraphKeys.QUEUE_RUNNERS 集合的一部分。

train() 中,操作 tf.start_queue_runners 将创建几个与在 GraphKeys.QUEUE_RUNNERS 集合中注册的入队操作相对应的线程,并开始在循环中评估它们。 reader.read 的结果将流经其他操作,直到到达 shuffle_batch 队列并保存在其内存缓冲区中。

shuffle_batch 之后的图表部分将由 Python 主线程驱动,由 sess.run([train_op, loss]) 命令启动。该线程将收集保存在 shuffle_batch 队列中的一批示例并将其向前传播。

这是一个手动输入队列而不是使用队列 运行ners 的示例。

queue_dtype = np.int32
queue_capacity = 2
values_queue = tf.FIFOQueue(capacity=queue_capacity, dtypes=queue_dtype)
size_op = values_queue.size()
value_placeholder = tf.placeholder(dtype=queue_dtype)
enqueue_op = values_queue.enqueue(value_placeholder)
dequeue_op = values_queue.dequeue()
close_op = values_queue.close()

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())

# add two elements onto the queue
sess.run([enqueue_op], {value_placeholder:2})
sess.run([enqueue_op], {value_placeholder:3})
# if you uncomment the next line, you'll hang because queue is full
# sess.run([enqueue_op], {value_placeholder:4})

# close the queue. This means 3rd read will throw OutOfRangeError instead of
# hanging until queue is replenished
sess.run([close_op])
print('queue has %d/%d entries' % (sess.run([size_op])[0], queue_capacity))

# take two elements off the queue
fancy_computation = tf.square(dequeue_op)
print('Computation result %d' %(sess.run([fancy_computation])[0]))
print('queue has %d/%d entries' % (sess.run([size_op])[0], queue_capacity))
print('Computation result %d' %(sess.run([fancy_computation])[0]))
print('queue has %d/%d entries' % (sess.run([size_op])[0], queue_capacity))
print('Computation result %d' %(sess.run([fancy_computation])[0]))
print('queue has %d/%d entries' % (sess.run([size_op])[0], queue_capacity))

如果你 运行 它应该看到什么

queue has 2/2 entries
Computation result 4
queue has 1/2 entries
Computation result 9
queue has 0/2 entries
---------------------------------------------------------------------------
OutOfRangeError