如何在每次迭代后 returns 控制的强化学习程序中不重新计算激活而使用 Tensorflow Optimizer?
How to use Tensorflow Optimizer without recomputing activations in reinforcement learning program that returns control after each iteration?
编辑(16 年 1 月 3 日):corresponding github issue
我正在使用 Tensorflow(Python 接口)实现一个 q-learning
代理,该代理具有使用 stochastic gradient descent
.
训练的函数逼近
在实验的每次迭代中,都会调用代理中的阶跃函数,根据新的奖励和激活更新逼近器的参数,然后选择要执行的新动作。
这里是问题所在(使用强化学习术语):
- 代理计算其状态-动作值预测以选择动作。
- 然后将控制权交还给模拟环境中某个步骤的另一个程序。
- 现在下一次迭代调用代理的步进函数。我想使用 Tensorflow 的优化器 class 为我计算梯度。但是,这需要我计算最后一步的状态-动作值预测及其图表。所以:
- 如果我运行整个图上的优化器,那么它必须重新计算状态-动作值预测。
- 但是,如果我将预测(针对所选操作)存储为变量,然后将其作为占位符提供给优化器,它不再具有计算梯度所需的图形。
- 我不能 运行 全部都在同一个
sess.run()
语句中,因为我必须放弃控制和 return 选择的动作才能获得下一个观察和奖励(用于损失函数的目标)。
那么,有什么方法可以(不用强化学习术语):
- 计算我的图表的一部分,returning value1。
- Return value1 到调用程序计算 value2
- 在下一次迭代中,使用 value2 作为我的梯度下降损失函数的一部分,而无需重新计算图中计算 value1 的部分。
当然,我已经考虑了显而易见的解决方案:
只需对梯度进行硬编码:这对于我现在使用的非常简单的逼近器来说很容易,但如果我在大型卷积网络中试验不同的过滤器和激活函数,那将非常不方便。如果可能的话,我真的很想使用优化器 class。
从代理中调用环境模拟:This system 这样做,但这会使我的更复杂,并删除大量的模块化和结构。所以,我不想这样做。
我已通读 API 和白皮书数次,但似乎无法提出解决方案。我试图想出一些方法将目标输入到图形中以计算梯度,但无法想出一种自动构建该图形的方法。
如果事实证明这在 TensorFlow 中是不可能的,您认为将其作为一个新的算子来实现会不会很复杂? (我已经有几年没有使用 C++ 了,所以 TensorFlow 源代码看起来有点吓人。)或者我最好还是切换到 Torch 之类的东西,它具有命令微分 Autograd,而不是符号微分?
感谢您花时间帮我解决这个问题。我试图使它尽可能简洁。
编辑:进一步搜索后,我发现了 this previously asked question。它与我的有点不同(他们试图避免在 Torch 中每次迭代更新 LSTM 网络两次),并且还没有任何答案。
下面是一些代码,如果有帮助的话:
'''
-Q-Learning agent for a grid-world environment.
-Receives input as raw RGB pixel representation of the screen.
-Uses an artificial neural network function approximator with one hidden layer
2015 Jonathon Byrd
'''
import random
import sys
#import copy
from rlglue.agent.Agent import Agent
from rlglue.agent import AgentLoader as AgentLoader
from rlglue.types import Action
from rlglue.types import Observation
import tensorflow as tf
import numpy as np
world_size = (3,3)
total_spaces = world_size[0] * world_size[1]
class simple_agent(Agent):
#Contants
discount_factor = tf.constant(0.5, name="discount_factor")
learning_rate = tf.constant(0.01, name="learning_rate")
exploration_rate = tf.Variable(0.2, name="exploration_rate") # used to be a constant :P
hidden_layer_size = 12
#Network Parameters - weights and biases
W = [tf.Variable(tf.truncated_normal([total_spaces * 3, hidden_layer_size], stddev=0.1), name="layer_1_weights"),
tf.Variable(tf.truncated_normal([hidden_layer_size,4], stddev=0.1), name="layer_2_weights")]
b = [tf.Variable(tf.zeros([hidden_layer_size]), name="layer_1_biases"), tf.Variable(tf.zeros([4]), name="layer_2_biases")]
#Input placeholders - observation and reward
screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="observation") #input pixel rgb values
reward = tf.placeholder(tf.float32, shape=[], name="reward")
#last step data
last_obs = np.array([1, 2, 3], ndmin=4)
last_act = -1
#Last step placeholders
last_screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="previous_observation")
last_move = tf.placeholder(tf.int32, shape = [], name="previous_action")
next_prediction = tf.placeholder(tf.float32, shape = [], name="next_prediction")
step_count = 0
def __init__(self):
#Initialize computational graphs
self.q_preds = self.Q(self.screen)
self.last_q_preds = self.Q(self.last_screen)
self.action = self.choose_action(self.q_preds)
self.next_pred = self.max_q(self.q_preds)
self.last_pred = self.act_to_pred(self.last_move, self.last_q_preds) # inefficient recomputation
self.loss = self.error(self.last_pred, self.reward, self.next_prediction)
self.train = self.learn(self.loss)
#Summaries and Statistics
tf.scalar_summary(['loss'], self.loss)
tf.scalar_summary('reward', self.reward)
#w_hist = tf.histogram_summary("weights", self.W[0])
self.summary_op = tf.merge_all_summaries()
self.sess = tf.Session()
self.summary_writer = tf.train.SummaryWriter('tensorlogs', graph_def=self.sess.graph_def)
def agent_init(self,taskSpec):
print("agent_init called")
self.sess.run(tf.initialize_all_variables())
def agent_start(self,observation):
#print("agent_start called, observation = {0}".format(observation.intArray))
o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255)
return self.control(o)
def agent_step(self,reward, observation):
#print("agent_step called, observation = {0}".format(observation.intArray))
print("step, reward: {0}".format(reward))
o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255)
next_prediction = self.sess.run([self.next_pred], feed_dict={self.screen:o})[0]
if self.step_count % 10 == 0:
summary_str = self.sess.run([self.summary_op, self.train],
feed_dict={self.reward:reward, self.last_screen:self.last_obs,
self.last_move:self.last_act, self.next_prediction:next_prediction})[0]
self.summary_writer.add_summary(summary_str, global_step=self.step_count)
else:
self.sess.run([self.train],
feed_dict={self.screen:o, self.reward:reward, self.last_screen:self.last_obs,
self.last_move:self.last_act, self.next_prediction:next_prediction})
return self.control(o)
def control(self, observation):
results = self.sess.run([self.action], feed_dict={self.screen:observation})
action = results[0]
self.last_act = action
self.last_obs = observation
if (action==0): # convert action integer to direction character
action = 'u'
elif (action==1):
action = 'l'
elif (action==2):
action = 'r'
elif (action==3):
action = 'd'
returnAction=Action()
returnAction.charArray=[action]
#print("return action returned {0}".format(action))
self.step_count += 1
return returnAction
def Q(self, obs): #calculates state-action value prediction with feed-forward neural net
with tf.name_scope('network_inference') as scope:
h1 = tf.nn.relu(tf.matmul(obs, self.W[0]) + self.b[0])
q_preds = tf.matmul(h1, self.W[1]) + self.b[1] #linear activation
return tf.reshape(q_preds, shape=[4])
def choose_action(self, q_preds): #chooses action epsilon-greedily
with tf.name_scope('action_choice') as scope:
exploration_roll = tf.random_uniform([])
#greedy_action = tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value
#random_action = tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)
#exploration rate updates
#if self.step_count % 10000 == 0:
#self.exploration_rate.assign(tf.div(self.exploration_rate, 2))
return tf.select(tf.greater_equal(exploration_roll, self.exploration_rate),
tf.argmax(q_preds, 0), #greedy_action
tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)) #random_action
'''
Why does this return NoneType?:
flag = tf.select(tf.greater_equal(exploration_roll, self.exploration_rate), 'g', 'r')
if flag == 'g': #greedy
return tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value
elif flag == 'r': #random
return tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)
'''
def error(self, last_pred, r, next_pred):
with tf.name_scope('loss_function') as scope:
y = tf.add(r, tf.mul(self.discount_factor, next_pred)) #target
return tf.square(tf.sub(y, last_pred)) #squared difference error
def learn(self, loss): #Update parameters using stochastic gradient descent
#TODO: Either figure out how to avoid computing the q-prediction twice or just hardcode the gradients.
with tf.name_scope('train') as scope:
return tf.train.GradientDescentOptimizer(self.learning_rate).minimize(loss, var_list=[self.W[0], self.W[1], self.b[0], self.b[1]])
def max_q(self, q_preds):
with tf.name_scope('greedy_estimate') as scope:
return tf.reduce_max(q_preds) #best predicted action from current state
def act_to_pred(self, a, preds): #get the value prediction for action a
with tf.name_scope('get_prediction') as scope:
return tf.slice(preds, tf.reshape(a, shape=[1]), [1])
def agent_end(self,reward):
pass
def agent_cleanup(self):
self.sess.close()
pass
def agent_message(self,inMessage):
if inMessage=="what is your name?":
return "my name is simple_agent";
else:
return "I don't know how to respond to your message";
if __name__=="__main__":
AgentLoader.loadAgent(simple_agent())
现在你想做的事情在 Tensorflow (0.6) 中是非常困难的。最好的办法是硬着头皮多次调用 运行,代价是重新计算激活。但是,我们内部非常清楚这个问题。原型 "partial run" 解决方案正在开发中,但目前尚无完成时间表。由于真正令人满意的答案可能需要修改 tensorflow 本身,因此您也可以为此提出 github 问题,看看是否还有其他人对此有话要说。
编辑:现已提供对 partial_run 的实验性支持。https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/client/session.py#L317
编辑(16 年 1 月 3 日):corresponding github issue
我正在使用 Tensorflow(Python 接口)实现一个 q-learning
代理,该代理具有使用 stochastic gradient descent
.
在实验的每次迭代中,都会调用代理中的阶跃函数,根据新的奖励和激活更新逼近器的参数,然后选择要执行的新动作。
这里是问题所在(使用强化学习术语):
- 代理计算其状态-动作值预测以选择动作。
- 然后将控制权交还给模拟环境中某个步骤的另一个程序。
- 现在下一次迭代调用代理的步进函数。我想使用 Tensorflow 的优化器 class 为我计算梯度。但是,这需要我计算最后一步的状态-动作值预测及其图表。所以:
- 如果我运行整个图上的优化器,那么它必须重新计算状态-动作值预测。
- 但是,如果我将预测(针对所选操作)存储为变量,然后将其作为占位符提供给优化器,它不再具有计算梯度所需的图形。
- 我不能 运行 全部都在同一个
sess.run()
语句中,因为我必须放弃控制和 return 选择的动作才能获得下一个观察和奖励(用于损失函数的目标)。
那么,有什么方法可以(不用强化学习术语):
- 计算我的图表的一部分,returning value1。
- Return value1 到调用程序计算 value2
- 在下一次迭代中,使用 value2 作为我的梯度下降损失函数的一部分,而无需重新计算图中计算 value1 的部分。
当然,我已经考虑了显而易见的解决方案:
只需对梯度进行硬编码:这对于我现在使用的非常简单的逼近器来说很容易,但如果我在大型卷积网络中试验不同的过滤器和激活函数,那将非常不方便。如果可能的话,我真的很想使用优化器 class。
从代理中调用环境模拟:This system 这样做,但这会使我的更复杂,并删除大量的模块化和结构。所以,我不想这样做。
我已通读 API 和白皮书数次,但似乎无法提出解决方案。我试图想出一些方法将目标输入到图形中以计算梯度,但无法想出一种自动构建该图形的方法。
如果事实证明这在 TensorFlow 中是不可能的,您认为将其作为一个新的算子来实现会不会很复杂? (我已经有几年没有使用 C++ 了,所以 TensorFlow 源代码看起来有点吓人。)或者我最好还是切换到 Torch 之类的东西,它具有命令微分 Autograd,而不是符号微分?
感谢您花时间帮我解决这个问题。我试图使它尽可能简洁。
编辑:进一步搜索后,我发现了 this previously asked question。它与我的有点不同(他们试图避免在 Torch 中每次迭代更新 LSTM 网络两次),并且还没有任何答案。
下面是一些代码,如果有帮助的话:
'''
-Q-Learning agent for a grid-world environment.
-Receives input as raw RGB pixel representation of the screen.
-Uses an artificial neural network function approximator with one hidden layer
2015 Jonathon Byrd
'''
import random
import sys
#import copy
from rlglue.agent.Agent import Agent
from rlglue.agent import AgentLoader as AgentLoader
from rlglue.types import Action
from rlglue.types import Observation
import tensorflow as tf
import numpy as np
world_size = (3,3)
total_spaces = world_size[0] * world_size[1]
class simple_agent(Agent):
#Contants
discount_factor = tf.constant(0.5, name="discount_factor")
learning_rate = tf.constant(0.01, name="learning_rate")
exploration_rate = tf.Variable(0.2, name="exploration_rate") # used to be a constant :P
hidden_layer_size = 12
#Network Parameters - weights and biases
W = [tf.Variable(tf.truncated_normal([total_spaces * 3, hidden_layer_size], stddev=0.1), name="layer_1_weights"),
tf.Variable(tf.truncated_normal([hidden_layer_size,4], stddev=0.1), name="layer_2_weights")]
b = [tf.Variable(tf.zeros([hidden_layer_size]), name="layer_1_biases"), tf.Variable(tf.zeros([4]), name="layer_2_biases")]
#Input placeholders - observation and reward
screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="observation") #input pixel rgb values
reward = tf.placeholder(tf.float32, shape=[], name="reward")
#last step data
last_obs = np.array([1, 2, 3], ndmin=4)
last_act = -1
#Last step placeholders
last_screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="previous_observation")
last_move = tf.placeholder(tf.int32, shape = [], name="previous_action")
next_prediction = tf.placeholder(tf.float32, shape = [], name="next_prediction")
step_count = 0
def __init__(self):
#Initialize computational graphs
self.q_preds = self.Q(self.screen)
self.last_q_preds = self.Q(self.last_screen)
self.action = self.choose_action(self.q_preds)
self.next_pred = self.max_q(self.q_preds)
self.last_pred = self.act_to_pred(self.last_move, self.last_q_preds) # inefficient recomputation
self.loss = self.error(self.last_pred, self.reward, self.next_prediction)
self.train = self.learn(self.loss)
#Summaries and Statistics
tf.scalar_summary(['loss'], self.loss)
tf.scalar_summary('reward', self.reward)
#w_hist = tf.histogram_summary("weights", self.W[0])
self.summary_op = tf.merge_all_summaries()
self.sess = tf.Session()
self.summary_writer = tf.train.SummaryWriter('tensorlogs', graph_def=self.sess.graph_def)
def agent_init(self,taskSpec):
print("agent_init called")
self.sess.run(tf.initialize_all_variables())
def agent_start(self,observation):
#print("agent_start called, observation = {0}".format(observation.intArray))
o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255)
return self.control(o)
def agent_step(self,reward, observation):
#print("agent_step called, observation = {0}".format(observation.intArray))
print("step, reward: {0}".format(reward))
o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255)
next_prediction = self.sess.run([self.next_pred], feed_dict={self.screen:o})[0]
if self.step_count % 10 == 0:
summary_str = self.sess.run([self.summary_op, self.train],
feed_dict={self.reward:reward, self.last_screen:self.last_obs,
self.last_move:self.last_act, self.next_prediction:next_prediction})[0]
self.summary_writer.add_summary(summary_str, global_step=self.step_count)
else:
self.sess.run([self.train],
feed_dict={self.screen:o, self.reward:reward, self.last_screen:self.last_obs,
self.last_move:self.last_act, self.next_prediction:next_prediction})
return self.control(o)
def control(self, observation):
results = self.sess.run([self.action], feed_dict={self.screen:observation})
action = results[0]
self.last_act = action
self.last_obs = observation
if (action==0): # convert action integer to direction character
action = 'u'
elif (action==1):
action = 'l'
elif (action==2):
action = 'r'
elif (action==3):
action = 'd'
returnAction=Action()
returnAction.charArray=[action]
#print("return action returned {0}".format(action))
self.step_count += 1
return returnAction
def Q(self, obs): #calculates state-action value prediction with feed-forward neural net
with tf.name_scope('network_inference') as scope:
h1 = tf.nn.relu(tf.matmul(obs, self.W[0]) + self.b[0])
q_preds = tf.matmul(h1, self.W[1]) + self.b[1] #linear activation
return tf.reshape(q_preds, shape=[4])
def choose_action(self, q_preds): #chooses action epsilon-greedily
with tf.name_scope('action_choice') as scope:
exploration_roll = tf.random_uniform([])
#greedy_action = tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value
#random_action = tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)
#exploration rate updates
#if self.step_count % 10000 == 0:
#self.exploration_rate.assign(tf.div(self.exploration_rate, 2))
return tf.select(tf.greater_equal(exploration_roll, self.exploration_rate),
tf.argmax(q_preds, 0), #greedy_action
tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)) #random_action
'''
Why does this return NoneType?:
flag = tf.select(tf.greater_equal(exploration_roll, self.exploration_rate), 'g', 'r')
if flag == 'g': #greedy
return tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value
elif flag == 'r': #random
return tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)
'''
def error(self, last_pred, r, next_pred):
with tf.name_scope('loss_function') as scope:
y = tf.add(r, tf.mul(self.discount_factor, next_pred)) #target
return tf.square(tf.sub(y, last_pred)) #squared difference error
def learn(self, loss): #Update parameters using stochastic gradient descent
#TODO: Either figure out how to avoid computing the q-prediction twice or just hardcode the gradients.
with tf.name_scope('train') as scope:
return tf.train.GradientDescentOptimizer(self.learning_rate).minimize(loss, var_list=[self.W[0], self.W[1], self.b[0], self.b[1]])
def max_q(self, q_preds):
with tf.name_scope('greedy_estimate') as scope:
return tf.reduce_max(q_preds) #best predicted action from current state
def act_to_pred(self, a, preds): #get the value prediction for action a
with tf.name_scope('get_prediction') as scope:
return tf.slice(preds, tf.reshape(a, shape=[1]), [1])
def agent_end(self,reward):
pass
def agent_cleanup(self):
self.sess.close()
pass
def agent_message(self,inMessage):
if inMessage=="what is your name?":
return "my name is simple_agent";
else:
return "I don't know how to respond to your message";
if __name__=="__main__":
AgentLoader.loadAgent(simple_agent())
现在你想做的事情在 Tensorflow (0.6) 中是非常困难的。最好的办法是硬着头皮多次调用 运行,代价是重新计算激活。但是,我们内部非常清楚这个问题。原型 "partial run" 解决方案正在开发中,但目前尚无完成时间表。由于真正令人满意的答案可能需要修改 tensorflow 本身,因此您也可以为此提出 github 问题,看看是否还有其他人对此有话要说。
编辑:现已提供对 partial_run 的实验性支持。https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/client/session.py#L317