使用 FIFOQueue 时在 Tensorflow 中进行小批量训练

Mini-batch training in Tensorflow when using FIFOQueue

我正在使用 Tensorflow 中的 tf.train.GradientDescentOptimizer() 训练线性回归问题。一般来说,我可以使用placeholdersfeed_dict={}每次输入一批样本并训练权重W。但是,我想使用 tf.FIFOQueue 而不是 feed_dict。例如,在下面的代码中,我输入 XY 以及训练权重 W:

v_dimen = 300
n_samples = 10000
batch_size = 32
X = tf.random_normal([n_samples, v_dimen], mean=0, stddev=1) 
Y = tf.random_normal([n_samples, 1], mean=0, stddev=1) 

q_in = tf.FIFOQueue(capacity=5, dtypes=tf.float32) # enqueue 5 batches
enqueue_op = q_in.enqueue(X)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_in, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
X_batch = q_in.dequeue()    

q_out = tf.FIFOQueue(capacity=5, dtypes=tf.float32)  # enqueue 5 batches
enqueue_op = q_out.enqueue(Y)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_out, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
Y_batch = q_out.dequeue() 

W = tf.Variable(tf.random.truncated_normal((v_dimen, 1), mean=0.0,stddev=0.001))
predicted_Y = f(X_batch) # some function on X, like tf.matmul(X_batch,W)
loss = tf.nn.l2_loss(Y_batch - predicted_Y)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01).minimize(loss, var_list=[W])
init = tf.global_variables_initializer()

with tf.Session() as sess:
    sess.run(init)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(10000):
        sess.run([optimizer]) # would like to run on mini batches

    coord.request_stop()
    coord.join(threads)

我想知道如何更改代码以便能够使用 X_batchY_batch 在大小为 batch_size 的小批量中训练 W

使用tf.data的代码(带注释):

import tensorflow as tf

v_dimen = 300
n_samples = 10000
batch_size = 32
X = tf.random_normal([n_samples, v_dimen], mean=0, stddev=1)
Y = tf.random_normal([n_samples, 1], mean=0, stddev=1)

# X and Y are fixed once having created.
dataset = tf.data.Dataset.from_tensor_slices((X, Y))
# dataset = dataset.shuffle(n_samples)  # shuffle
dataset = dataset.repeat()  # will raise OutOfRangeError if not repeat
dataset = dataset.batch(batch_size)  # specify batch_size
iterator = dataset.make_initializable_iterator()
X_batch, Y_batch = iterator.get_next()  # like dequeue.

W = tf.Variable(tf.random.truncated_normal((v_dimen, 1), mean=0.0, stddev=0.001))
predicted_Y = tf.matmul(X_batch, W)  # some function on X, like tf.matmul(X_batch,W)
loss = tf.nn.l2_loss(Y_batch - predicted_Y)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01).minimize(loss, var_list=[W])
init = [tf.global_variables_initializer(), iterator.initializer]  # iterator.initializer should be initialized.

with tf.Session() as sess:
    sess.run(init)
    for i in range(1000):
        _, x, y = sess.run([optimizer, X_batch, Y_batch])
        print(i, x.shape, y.shape, y[0])  # y[0] will be repeated after 10000 / 32 = 625 iterations. 

如果您想使用将被弃用的 queue,请参阅下面的代码(带注释)

import tensorflow as tf

v_dimen = 300
n_samples = 100  # you don't enqueue too many elements each time.
batch_size = 32
X = tf.random_normal([n_samples, v_dimen], mean=0, stddev=1) 
Y = tf.random_normal([n_samples, 1], mean=0, stddev=1) 
# each time X and Y will be re-created when being demanded to enqueue.

# The capacity of queue is not the same as the batch size, it is just for the queue. 
# It is the upper bound on the number of elements that may be stored in this queue.
# When you want to use `dequeue_many`, which allows to specify the batch size, the `shapes` is also important.
# Because `dequeue_many` slices each component tensor along the 0th dimension to make multiple elements as output. 
# For the same reason, `enqueue_many` should be used.
# see more in the documentation of `FIFOQueue`, `enqueue_many` and `dequeue_many`.
q_in = tf.FIFOQueue(capacity=50, dtypes=tf.float32, shapes=[v_dimen]) 
enqueue_op = q_in.enqueue_many(X)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_in, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
X_batch = q_in.dequeue_many(batch_size)

q_out = tf.FIFOQueue(capacity=50, dtypes=tf.float32, shapes=[1])
enqueue_op = q_out.enqueue_many(Y)
numberOfThreads = 1
qr = tf.train.QueueRunner(q_out, [enqueue_op] * numberOfThreads)
tf.train.add_queue_runner(qr)
Y_batch = q_out.dequeue_many(batch_size)

W = tf.Variable(tf.random.truncated_normal((v_dimen, 1), mean=0.0,stddev=0.001))
predicted_Y = tf.matmul(X_batch,W) # some function on X, like tf.matmul(X_batch,W)
loss = tf.nn.l2_loss(Y_batch - predicted_Y)
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.01).minimize(loss, var_list=[W])
init = tf.global_variables_initializer()

with tf.Session() as sess:
    sess.run(init)
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for i in range(1000):
        sess.run([optimizer])

    coord.request_stop()
    coord.join(threads)