如何在张量流中使用 QueueRunner 将动态创建的输入图像添加到 RandomShuffleQueue
How to add dynamically created input images to a RandomShuffleQueue using a QueueRunner in tensor flow
我正在尝试使用程序执行期间创建的图像来训练 CNN。我有一个游戏环境(不是我创建的),它根据游戏中的操作生成屏幕图像。这些动作由学习到的 CNN 控制。
然后将这些图像推入 RandomShuffleQueue,从中取出小批量并用于训练 CNN 采取正确的动作。
我想异步执行此操作(游戏和训练),其中正在玩游戏并将其屏幕添加到用于训练模型的单独线程中的 RandomShuffleQueue。这是我正在尝试的一个非常简化的版本。
import tensorflow as tf
from game_env import game
experience = tf.RandomShuffleQueue(10000,
1000, tf.float32,
shapes = [32,32],
name = 'experience_replay')
def perceive(game):
rawstate = game.grab_screen()
enq = experience.enqueue(rawstate)
return enq
#create threads to play the game and collect experience
available_threads = 4
coord = tf.train.Coordinator()
experience_runner = tf.train.QueueRunner(experience,
[perceive(game()) for num in range(available_threads)])
sess = tf.Session()
sess.run(tf.initialize_all_variables())
enqueue_threads = experience_runner.create_threads(sess, coord = coord, start = True)
with sess.as_default():
while(1):
print sess.run(experience.dequeue())
time.sleep(.5)
与此同时,game_env 看起来像这样:
import tensorflow as tf
class game(object):
def __init__(self):
self.screen_size = [32,32]
self.counter = 0
def grab_screen(self):
"""current screen of the game"""
self.counter += 1
screen = self.counter*tf.ones(self.screen_size)
return screen
如您所见,目前的游戏环境非常简单:每次执行屏幕抓取时,计数器都会递增,并且填充计数器(正确大小)的图像为 return编辑。
需要注意的是,我写上面的 class 只是为了测试,一般来说 grab_screen 可以 return 任何 numpy nd 数组。而且它不是我写的,所以我可以调用 grab_screen 而不会在真实的东西中进行任何更改。
现在,问题是经验队列似乎只包含张量(即计数器只更新一次!!)
示例输出:
I tensorflow/core/common_runtime/local_device.cc:40] Local device intra op parallelism threads: 4
I tensorflow/core/common_runtime/direct_session.cc:58] Direct session inter op parallelism threads: 4
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
等等。
我的问题是:如何动态创建要像这样排队到 RandomShuffleQueue 的输入图像?
谢谢!
问题可以追溯到这一行,它定义了 tf.train.QueueRunner
:
experience_runner = tf.train.QueueRunner(
experience, [perceive(game()) for num in range(available_threads)])
这会创建四个 (available_threads
) 操作,每次它们中的任何一个 运行 都会将一个填充了 1.0 的张量排入 experience
队列。
逐步了解列表理解中发生的事情应该使这一点更清楚。以下情况发生四次:
- 构造了一个
game
对象。
- 传递给
perceive()
.
perceive()
调用一次 game.grab_screen()
,这会增加计数器,returns 一个张量 1 * tf.ones(self.screen_size)
percieve()
将此张量传递给 experience.enqueue()
并 returns 生成运算符。
QueueRunner.create_threads()
调用为每个入队操作创建一个线程,并且这些 运行 处于无限循环中(当队列达到容量时阻塞)。
要获得预期的效果,您应该使用 feed mechanism and a placeholder 在每次排队体验时为抓取的屏幕传递不同的值。这取决于您的 game
class 是如何实现的,但您可能还想初始化 class 的单个实例。最后,不清楚您是否需要多个排队线程,但我们假设 game.grab_screen()
是 thread-safe 并允许一些并发。考虑到所有这些,一个合理的版本如下所示(请注意,您需要创建自定义线程而不是 QueueRunner
才能使用喂食):
import tensorflow as tf
from game_env import game
experience = tf.RandomShuffleQueue(10000,
1000, tf.float32,
shapes=[32,32],
name='experience_replay')
screen_placeholder = tf.placeholder(tf.float32, [32, 32])
# You can create a single enqueue op and dequeued tensor, and reuse these from
# multiple threads.
enqueue_op = experience.enqueue(screen_placeholder)
dequeued_t = experience.dequeue()
# ...
init_op = tf.initialize_all_variables()
game_obj = game()
sess = tf.Session()
coord = tf.train.Coordinator()
# Define a custom thread for running the enqueue op that grabs a new
# screen in a loop and feeds it to the placeholder.
def enqueue_thread():
with coord.stop_on_exception():
while not coord.should_stop():
screen_val = game_obj.grab_screen()
# Run the same op, but feed a different value for the screen.
sess.run(enqueue_op, feed_dict={screen_placeholder: screen_val})
available_threads = 4
for _ in range(available_threads):
threading.Thread(target=enqueue_thread).start()
while True:
# N.B. It's more efficient to reuse the same dequeue op in a loop.
print sess.run(dequeued_t)
time.sleep(0.5)
我正在尝试使用程序执行期间创建的图像来训练 CNN。我有一个游戏环境(不是我创建的),它根据游戏中的操作生成屏幕图像。这些动作由学习到的 CNN 控制。
然后将这些图像推入 RandomShuffleQueue,从中取出小批量并用于训练 CNN 采取正确的动作。 我想异步执行此操作(游戏和训练),其中正在玩游戏并将其屏幕添加到用于训练模型的单独线程中的 RandomShuffleQueue。这是我正在尝试的一个非常简化的版本。
import tensorflow as tf
from game_env import game
experience = tf.RandomShuffleQueue(10000,
1000, tf.float32,
shapes = [32,32],
name = 'experience_replay')
def perceive(game):
rawstate = game.grab_screen()
enq = experience.enqueue(rawstate)
return enq
#create threads to play the game and collect experience
available_threads = 4
coord = tf.train.Coordinator()
experience_runner = tf.train.QueueRunner(experience,
[perceive(game()) for num in range(available_threads)])
sess = tf.Session()
sess.run(tf.initialize_all_variables())
enqueue_threads = experience_runner.create_threads(sess, coord = coord, start = True)
with sess.as_default():
while(1):
print sess.run(experience.dequeue())
time.sleep(.5)
与此同时,game_env 看起来像这样:
import tensorflow as tf
class game(object):
def __init__(self):
self.screen_size = [32,32]
self.counter = 0
def grab_screen(self):
"""current screen of the game"""
self.counter += 1
screen = self.counter*tf.ones(self.screen_size)
return screen
如您所见,目前的游戏环境非常简单:每次执行屏幕抓取时,计数器都会递增,并且填充计数器(正确大小)的图像为 return编辑。
需要注意的是,我写上面的 class 只是为了测试,一般来说 grab_screen 可以 return 任何 numpy nd 数组。而且它不是我写的,所以我可以调用 grab_screen 而不会在真实的东西中进行任何更改。
现在,问题是经验队列似乎只包含张量(即计数器只更新一次!!)
示例输出:
I tensorflow/core/common_runtime/local_device.cc:40] Local device intra op parallelism threads: 4
I tensorflow/core/common_runtime/direct_session.cc:58] Direct session inter op parallelism threads: 4
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
等等。 我的问题是:如何动态创建要像这样排队到 RandomShuffleQueue 的输入图像? 谢谢!
问题可以追溯到这一行,它定义了 tf.train.QueueRunner
:
experience_runner = tf.train.QueueRunner(
experience, [perceive(game()) for num in range(available_threads)])
这会创建四个 (available_threads
) 操作,每次它们中的任何一个 运行 都会将一个填充了 1.0 的张量排入 experience
队列。
逐步了解列表理解中发生的事情应该使这一点更清楚。以下情况发生四次:
- 构造了一个
game
对象。 - 传递给
perceive()
. perceive()
调用一次game.grab_screen()
,这会增加计数器,returns 一个张量1 * tf.ones(self.screen_size)
percieve()
将此张量传递给experience.enqueue()
并 returns 生成运算符。
QueueRunner.create_threads()
调用为每个入队操作创建一个线程,并且这些 运行 处于无限循环中(当队列达到容量时阻塞)。
要获得预期的效果,您应该使用 feed mechanism and a placeholder 在每次排队体验时为抓取的屏幕传递不同的值。这取决于您的 game
class 是如何实现的,但您可能还想初始化 class 的单个实例。最后,不清楚您是否需要多个排队线程,但我们假设 game.grab_screen()
是 thread-safe 并允许一些并发。考虑到所有这些,一个合理的版本如下所示(请注意,您需要创建自定义线程而不是 QueueRunner
才能使用喂食):
import tensorflow as tf
from game_env import game
experience = tf.RandomShuffleQueue(10000,
1000, tf.float32,
shapes=[32,32],
name='experience_replay')
screen_placeholder = tf.placeholder(tf.float32, [32, 32])
# You can create a single enqueue op and dequeued tensor, and reuse these from
# multiple threads.
enqueue_op = experience.enqueue(screen_placeholder)
dequeued_t = experience.dequeue()
# ...
init_op = tf.initialize_all_variables()
game_obj = game()
sess = tf.Session()
coord = tf.train.Coordinator()
# Define a custom thread for running the enqueue op that grabs a new
# screen in a loop and feeds it to the placeholder.
def enqueue_thread():
with coord.stop_on_exception():
while not coord.should_stop():
screen_val = game_obj.grab_screen()
# Run the same op, but feed a different value for the screen.
sess.run(enqueue_op, feed_dict={screen_placeholder: screen_val})
available_threads = 4
for _ in range(available_threads):
threading.Thread(target=enqueue_thread).start()
while True:
# N.B. It's more efficient to reuse the same dequeue op in a loop.
print sess.run(dequeued_t)
time.sleep(0.5)