如何在张量流中使用 QueueRunner 将动态创建的输入图像添加到 RandomShuffleQueue

How to add dynamically created input images to a RandomShuffleQueue using a QueueRunner in tensor flow

我正在尝试使用程序执行期间创建的图像来训练 CNN。我有一个游戏环境(不是我创建的),它根据游戏中的操作生成屏幕图像。这些动作由学习到的 CNN 控制。

然后将这些图像推入 RandomShuffleQueue,从中取出小批量并用于训练 CNN 采取正确的动作。 我想异步执行此操作(游戏和训练),其中正在玩游戏并将其屏幕添加到用于训练模型的单独线程中的 RandomShuffleQueue。这是我正在尝试的一个非常简化的版本。

import tensorflow as tf
from game_env import game


experience = tf.RandomShuffleQueue(10000,
                                1000, tf.float32,
                                shapes = [32,32],  
                                name = 'experience_replay')

def perceive(game):
    rawstate = game.grab_screen()
    enq = experience.enqueue(rawstate)
    return enq

#create threads to play the game and collect experience
available_threads = 4
coord = tf.train.Coordinator()
experience_runner = tf.train.QueueRunner(experience,
                                [perceive(game()) for num in range(available_threads)])

sess = tf.Session()
sess.run(tf.initialize_all_variables())
enqueue_threads = experience_runner.create_threads(sess, coord = coord, start = True)

with sess.as_default():
    while(1):
        print sess.run(experience.dequeue())
        time.sleep(.5)

与此同时,game_env 看起来像这样:

import tensorflow as tf
class game(object): 
    def __init__(self):
        self.screen_size = [32,32]
        self.counter = 0

    def grab_screen(self):
        """current screen of the game"""
        self.counter += 1
        screen = self.counter*tf.ones(self.screen_size)
        return screen

如您所见,目前的游戏环境非常简单:每次执行屏幕抓取时,计数器都会递增,并且填充计数器(正确大小)的图像为 return编辑。

需要注意的是,我写上面的 class 只是为了测试,一般来说 grab_screen 可以 return 任何 numpy nd 数组。而且它不是我写的,所以我可以调用 grab_screen 而不会在真实的东西中进行任何更改。

现在,问题是经验队列似乎只包含张量(即计数器只更新一次!!)

示例输出:

I tensorflow/core/common_runtime/local_device.cc:40] Local device intra op parallelism threads: 4
I tensorflow/core/common_runtime/direct_session.cc:58] Direct session inter op parallelism threads: 4

[[ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 ...,

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]]

[[ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 ...,

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]]

[[ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 ...,
 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]]

等等。 我的问题是:如何动态创建要像这样排队到 RandomShuffleQueue 的输入图像? 谢谢!

问题可以追溯到这一行,它定义了 tf.train.QueueRunner:

experience_runner = tf.train.QueueRunner(
    experience, [perceive(game()) for num in range(available_threads)])

这会创建四个 (available_threads) 操作,每次它们中的任何一个 运行 都会将一个填充了 1.0 的张量排入 experience 队列。 逐步了解列表理解中发生的事情应该使这一点更清楚。以下情况发生四次:

  1. 构造了一个game对象。
  2. 传递给perceive().
  3. perceive() 调用一次 game.grab_screen(),这会增加计数器,returns 一个张量 1 * tf.ones(self.screen_size)
  4. percieve() 将此张量传递给 experience.enqueue() 并 returns 生成运算符。

QueueRunner.create_threads() 调用为每个入队操作创建一个线程,并且这些 运行 处于无限循环中(当队列达到容量时阻塞)。

要获得预期的效果,您应该使用 feed mechanism and a placeholder 在每次排队体验时为抓取的屏幕传递不同的值。这取决于您的 game class 是如何实现的,但您可能还想初始化 class 的单个实例。最后,不清楚您是否需要多个排队线程,但我们假设 game.grab_screen() 是 thread-safe 并允许一些并发。考虑到所有这些,一个合理的版本如下所示(请注意,您需要创建自定义线程而不是 QueueRunner 才能使用喂食):

import tensorflow as tf
from game_env import game

experience = tf.RandomShuffleQueue(10000,
                                   1000, tf.float32,
                                   shapes=[32,32],  
                                   name='experience_replay')

screen_placeholder = tf.placeholder(tf.float32, [32, 32])
# You can create a single enqueue op and dequeued tensor, and reuse these from
# multiple threads.
enqueue_op = experience.enqueue(screen_placeholder)
dequeued_t = experience.dequeue()
# ...

init_op = tf.initialize_all_variables()

game_obj = game()

sess = tf.Session()
coord = tf.train.Coordinator()

# Define a custom thread for running the enqueue op that grabs a new
# screen in a loop and feeds it to the placeholder.
def enqueue_thread():
    with coord.stop_on_exception():
        while not coord.should_stop():
            screen_val = game_obj.grab_screen()
            # Run the same op, but feed a different value for the screen.
            sess.run(enqueue_op, feed_dict={screen_placeholder: screen_val}) 

available_threads = 4
for _ in range(available_threads):
    threading.Thread(target=enqueue_thread).start()


while True:
    # N.B. It's more efficient to reuse the same dequeue op in a loop.
    print sess.run(dequeued_t)
    time.sleep(0.5)