如何从 Pytorch 中的 OpenAI 中删除 Cartpole 游戏中的显示效果图
How to remove showing renderings in the Cartpole game from OpenAI in Pytorch
我当前的代码基于他们网站上的 Pytorch 示例,他们使用 env.render() 来创建下一个状态。这使得游戏 运行 非常慢,并且希望在没有渲染的情况下 运行 快得多。这是使用渲染函数的 class,底部是完整代码。我有什么想法可以让它发挥作用吗?
class CartPoleEnvManager(): #manage cartpole env
def __init__(self, device):
self.device = device
self.env = gym.make('CartPole-v0').unwrapped #unwrapped so we can see other dynamics that there is no access to otherwise
self.env.reset() #reset to starting state
self.current_screen = None #screen initialization
self.done = False #if episode is finished
def reset(self):
self.env.reset()
self.current_screen = None
def close(self): #close env
self.env.close()
def render(self, mode='human'): #render the current state to screen
return self.env.render(mode)
def num_actions_available(self): #returns # actions available to agent (2)
return self.env.action_space.n
def take_action(self, action):# step returns tuple containing env observation, reward and diagnostic info -- all from taking a certain action
_, reward, self.done, _ = self.env.step(action.item()) # only reward and done status are of importance
return torch.tensor([reward], device=self.device)
#####action is a tensor, action.item() gives a number, what step wants
def just_starting(self):
return self.current_screen is None
def get_state(self): #return to the current state of env in the form of a processed image of the screen
if self.just_starting() or self.done:
self.current_screen = self.get_processed_screen() #state = processed image of diff of 2 separate screens
black_screen = torch.zeros_like(self.current_screen)
return black_screen
else:
s1 = self.current_screen
s2 = self.get_processed_screen() ####what is get_processed_screen?
self.current_screen = s2
return s2 - s1 # this represents a single state
def get_screen_height(self):
screen = self.get_processed_screen()
return screen.shape[2]
def get_screen_width(self):
screen = self.get_processed_screen()
return screen.shape[3]
def get_processed_screen(self):
screen = self.env.render(mode='rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
screen = self.crop_screen(screen)
return self.transform_screen_data(screen)
def crop_screen(self, screen):
screen_height = screen.shape[1]
# Strip off top and bottom
top = int(screen_height * 0.4)
bottom = int(screen_height * 0.8)
screen = screen[:, top:bottom, :]
return screen
def transform_screen_data(self, screen):
# Convert to float, rescale, convert to tensor
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
# Use torchvision package to compose image transforms
resize = T.Compose([
T.ToPILImage()
,T.Resize((40,90))
,T.ToTensor()
])
return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)
完整代码:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nnfr
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import pennylane as qml
import numpy as np
import torch
import torch.nn as nn
from torch.nn.functional import relu, sigmoid
class DQN(nn.Module):
def __init__(self, img_height, img_width):
super().__init__()
self.fc1 = nn.Linear(in_features=img_height*img_width*3, out_features=64)
self.fc2 = nn.Linear(in_features=64, out_features=48)
self.fc3 = nn.Linear(in_features=48, out_features=32)
self.out = nn.Linear(in_features=32, out_features=2)
def forward(self, b):
b = b.flatten(start_dim=1)
#t = F.relu(clayer_out)
b = F.relu(self.fc1(b))
b = F.relu(self.fc2(b))
b = F.relu(self.fc3(b))
b = self.out(b)
return b
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display
Experience = namedtuple(
'Experience',
('state', 'action', 'next_state', 'reward')
) # use experiences to train network
class ReplayMemory():
def __init__(self, capacity): # replay memory has set capacity, only need to
self.capacity = capacity # initialize capacity for a ReplayMemory object
self.memory = [] #memory will hold the experiences
self.push_count = 0 #to show how many experiences we've added to memory
def push(self, experience): # to add experiences in memory
if len(self.memory) < self.capacity: # ensuring that the length of memory doesn't exceed set capacity
self.memory.append(experience)
else:
self.memory[self.push_count % self.capacity] = experience # if memory greater than capacity,
# then the new experiences will be put to the front of memory, erasing the
# oldest experiences in the memory array
self.push_count += 1
def sample(self, batch_size): # returns a randome sample of experiences, to later use for
return random.sample(self.memory, batch_size) #random.sample(sequence, k), of the sequence, gives k randomly chosen experiences
def can_provide_sample(self, batch_size): #to sample, batch size needs to be bigger than memory -- this is important at the beginning
return len(self.memory) >= batch_size
class EpsilonGreedyStrategy(): #explor vs. exploitation
def __init__(self, start, end, decay):
self.start = start
self.end = end
self.decay = decay
def get_exploration_rate(self, current_step): ####this was not explained in the video, woher kommt
return self.end + self.start*(1/(1+self.decay*current_step))#self.end + (self.start - self.end) * \
#math.exp(-1. * current_step * self.decay)
class LearningRate():
def __init__(self, start, end, decay, current_step):
self.start = start
self.end = end
self.decay = decay
self.current_step = current_step
def get_learning_rate(self, current_step):
self.current_step += 1
return self.end + self.start*(1/(1+self.decay*current_step))
class lr(): # learning rate class needed. Left for possible future use, need to update things beforehand
def __init__(self, learning_rate):
self.current_step = 0
self.learning_rate = learning_rate
def update_lr(self):
lrrate = learning_rate.get_learning_rate(self.current_step)
self.current_step +=1
class Agent():
def __init__(self, strategy, num_actions, device): # when we later create an agent object, need to get strategy from epsilon, num_actions = how many actions from a given state (2 for this game), device is the device in pytorch for tensor calculations CPU or GPU
self.current_step = 0 # current step number in the environment
self.strategy = strategy
self.num_actions = num_actions
self.device = device
def select_action(self, state, policy_net): #policy_net is the policy trained by DQN
rate = strategy.get_exploration_rate(self.current_step)
self.current_step += 1
if rate > random.random():
action = random.randrange(self.num_actions)
return torch.tensor([action]).to(self.device) # explore
else:
with torch.no_grad(): #turn off gradient tracking
return policy_net(state).argmax(dim=1).to(self.device) # exploit
class CartPoleEnvManager(): #manage cartpole env
def __init__(self, device):
self.device = device
self.env = gym.make('CartPole-v0').unwrapped #unwrapped so we can see other dynamics that there is no access to otherwise
self.env.reset() #reset to starting state
self.current_screen = None #screen initialization
self.done = False #if episode is finished
def reset(self):
self.env.reset()
self.current_screen = None
def close(self): #close env
self.env.close()
def render(self, mode='human'): #render the current state to screen
return self.env.render(mode)
def num_actions_available(self): #returns # actions available to agent (2)
return self.env.action_space.n
def take_action(self, action):# step returns tuple containing env observation, reward and diagnostic info -- all from taking a certain action
_, reward, self.done, _ = self.env.step(action.item()) # only reward and done status are of importance
return torch.tensor([reward], device=self.device)
#####action is a tensor, action.item() gives a number, what step wants
def just_starting(self):
return self.current_screen is None
def get_state(self): #return to the current state of env in the form of a processed image of the screen
if self.just_starting() or self.done:
self.current_screen = self.get_processed_screen() #state = processed image of diff of 2 separate screens
black_screen = torch.zeros_like(self.current_screen)
return black_screen
else:
s1 = self.current_screen
s2 = self.get_processed_screen() ####what is get_processed_screen?
self.current_screen = s2
return s2 - s1 # this represents a single state
def get_screen_height(self):
screen = self.get_processed_screen()
return screen.shape[2]
def get_screen_width(self):
screen = self.get_processed_screen()
return screen.shape[3]
def get_processed_screen(self):
screen = self.env.render(mode='rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
screen = self.crop_screen(screen)
return self.transform_screen_data(screen)
def crop_screen(self, screen):
screen_height = screen.shape[1]
# Strip off top and bottom
top = int(screen_height * 0.4)
bottom = int(screen_height * 0.8)
screen = screen[:, top:bottom, :]
return screen
def transform_screen_data(self, screen):
# Convert to float, rescale, convert to tensor
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
# Use torchvision package to compose image transforms
resize = T.Compose([
T.ToPILImage()
,T.Resize((40,90))
,T.ToTensor()
])
return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)
def plot(values, moving_avg_period):
plt.figure(2)
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(values)
moving_avg = get_moving_average(moving_avg_period, values)
plt.plot(moving_avg)
plt.pause(0.001)
print("Episode", len(values), "\n", \
moving_avg_period, "episode moving avg:", moving_avg[-1])
if is_ipython: display.clear_output(wait=True)
def get_moving_average(period, values):
values = torch.tensor(values, dtype=torch.float)
if len(values) >= period:
moving_avg = values.unfold(dimension=0, size=period, step=1) \
.mean(dim=1).flatten(start_dim=0)
moving_avg = torch.cat((torch.zeros(period-1), moving_avg))
return moving_avg.numpy()
else:
moving_avg = torch.zeros(len(values))
return moving_avg.numpy()
def extract_tensors(experiences):
# Convert batch of Experiences to Experience of batches
batch = Experience(*zip(*experiences))
t1 = torch.cat(batch.state)
t2 = torch.cat(batch.action)
t3 = torch.cat(batch.reward)
t4 = torch.cat(batch.next_state)
return (t1,t2,t3,t4)
class QValues():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
@staticmethod
def get_current(policy_net, states, actions):
return policy_net(states).gather(dim=1, index=actions.unsqueeze(-1))
@staticmethod
def get_next(target_net, next_states):
final_state_locations = next_states.flatten(start_dim=1) \
.max(dim=1)[0].eq(0).type(torch.bool)
non_final_state_locations = (final_state_locations == False)
non_final_states = next_states[non_final_state_locations]
batch_size = next_states.shape[0]
values = torch.zeros(batch_size).to(QValues.device)
values[non_final_state_locations] = target_net(non_final_states).max(dim=1)[0].detach()
return values
batch_size = 128
gamma = 0.999
eps_start = 1
eps_end = 0.01
eps_decay = 0.0005
target_update = 10
memory_size = 500000
lr_start = 0.01
lr_end = 0.00001
lr_decay = 0.00009
num_episodes = 1000 # run for more episodes for better results
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = CartPoleEnvManager(device)
strategy = EpsilonGreedyStrategy(eps_start, eps_end, eps_decay)
agent = Agent(strategy, em.num_actions_available(), device)
memory = ReplayMemory(memory_size)
policy_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval() #tells pytorch that target_net is only used for inference, not training
optimizer = optim.Adam(params=policy_net.parameters(), lr=0.01)
i = 0
episode_durations = []
for episode in range(num_episodes): #iterate over each episode
em.reset()
state = em.get_state()
for timestep in count():
action = agent.select_action(state, policy_net)
reward = em.take_action(action)
next_state = em.get_state()
memory.push(Experience(state, action, next_state, reward))
state = next_state
i = 0
if memory.can_provide_sample(batch_size):
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.9)
experiences = memory.sample(batch_size)
states, actions, rewards, next_states = extract_tensors(experiences)
current_q_values = QValues.get_current(policy_net, states, actions)
next_q_values = QValues.get_next(target_net, next_states) #will get the max qvalues of the next state, q values of next state are used via next state
target_q_values = (next_q_values * gamma) + rewards
loss = F.mse_loss(current_q_values, target_q_values.unsqueeze(1))
optimizer.zero_grad() # sets the gradiesnt of all weights n biases in policy_net to zero
loss.backward() #computes gradient of loss with respect to all weights n biases in the policy net
optimizer.step() # updates the weights n biases with the gradients that were computed form loss.backwards
scheduler.step()
if em.done:
episode_durations.append(timestep)
plot(episode_durations, 100)
break
if episode % target_update == 0:
target_net.load_state_dict(policy_net.state_dict())
em.close()
您正在使用 "hacked"(如果您愿意,也可以修补)版本的 CartPole 环境,它实际上用渲染图像替换了真实状态 CartPole-v0
returns。因此,您的代码正在尝试训练一种将图像作为输入而不是 4 值特征数组原始 CartPole-v0
正在返回的策略。
如果你打电话的时候仔细看
_, reward, self.done, _ = self.env.step(action.item())
第一个元素_
是原始CartPole-v0
环境的实际状态
然后,您所拥有的 class 不会使用它进行渲染并返回图像作为训练的输入。
因此,对于现有任务(有效状态是图像),您不能真正跳过渲染,因为它是为策略准备输入的一部分。
我当前的代码基于他们网站上的 Pytorch 示例,他们使用 env.render() 来创建下一个状态。这使得游戏 运行 非常慢,并且希望在没有渲染的情况下 运行 快得多。这是使用渲染函数的 class,底部是完整代码。我有什么想法可以让它发挥作用吗?
class CartPoleEnvManager(): #manage cartpole env
def __init__(self, device):
self.device = device
self.env = gym.make('CartPole-v0').unwrapped #unwrapped so we can see other dynamics that there is no access to otherwise
self.env.reset() #reset to starting state
self.current_screen = None #screen initialization
self.done = False #if episode is finished
def reset(self):
self.env.reset()
self.current_screen = None
def close(self): #close env
self.env.close()
def render(self, mode='human'): #render the current state to screen
return self.env.render(mode)
def num_actions_available(self): #returns # actions available to agent (2)
return self.env.action_space.n
def take_action(self, action):# step returns tuple containing env observation, reward and diagnostic info -- all from taking a certain action
_, reward, self.done, _ = self.env.step(action.item()) # only reward and done status are of importance
return torch.tensor([reward], device=self.device)
#####action is a tensor, action.item() gives a number, what step wants
def just_starting(self):
return self.current_screen is None
def get_state(self): #return to the current state of env in the form of a processed image of the screen
if self.just_starting() or self.done:
self.current_screen = self.get_processed_screen() #state = processed image of diff of 2 separate screens
black_screen = torch.zeros_like(self.current_screen)
return black_screen
else:
s1 = self.current_screen
s2 = self.get_processed_screen() ####what is get_processed_screen?
self.current_screen = s2
return s2 - s1 # this represents a single state
def get_screen_height(self):
screen = self.get_processed_screen()
return screen.shape[2]
def get_screen_width(self):
screen = self.get_processed_screen()
return screen.shape[3]
def get_processed_screen(self):
screen = self.env.render(mode='rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
screen = self.crop_screen(screen)
return self.transform_screen_data(screen)
def crop_screen(self, screen):
screen_height = screen.shape[1]
# Strip off top and bottom
top = int(screen_height * 0.4)
bottom = int(screen_height * 0.8)
screen = screen[:, top:bottom, :]
return screen
def transform_screen_data(self, screen):
# Convert to float, rescale, convert to tensor
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
# Use torchvision package to compose image transforms
resize = T.Compose([
T.ToPILImage()
,T.Resize((40,90))
,T.ToTensor()
])
return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)
完整代码:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nnfr
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import pennylane as qml
import numpy as np
import torch
import torch.nn as nn
from torch.nn.functional import relu, sigmoid
class DQN(nn.Module):
def __init__(self, img_height, img_width):
super().__init__()
self.fc1 = nn.Linear(in_features=img_height*img_width*3, out_features=64)
self.fc2 = nn.Linear(in_features=64, out_features=48)
self.fc3 = nn.Linear(in_features=48, out_features=32)
self.out = nn.Linear(in_features=32, out_features=2)
def forward(self, b):
b = b.flatten(start_dim=1)
#t = F.relu(clayer_out)
b = F.relu(self.fc1(b))
b = F.relu(self.fc2(b))
b = F.relu(self.fc3(b))
b = self.out(b)
return b
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display
Experience = namedtuple(
'Experience',
('state', 'action', 'next_state', 'reward')
) # use experiences to train network
class ReplayMemory():
def __init__(self, capacity): # replay memory has set capacity, only need to
self.capacity = capacity # initialize capacity for a ReplayMemory object
self.memory = [] #memory will hold the experiences
self.push_count = 0 #to show how many experiences we've added to memory
def push(self, experience): # to add experiences in memory
if len(self.memory) < self.capacity: # ensuring that the length of memory doesn't exceed set capacity
self.memory.append(experience)
else:
self.memory[self.push_count % self.capacity] = experience # if memory greater than capacity,
# then the new experiences will be put to the front of memory, erasing the
# oldest experiences in the memory array
self.push_count += 1
def sample(self, batch_size): # returns a randome sample of experiences, to later use for
return random.sample(self.memory, batch_size) #random.sample(sequence, k), of the sequence, gives k randomly chosen experiences
def can_provide_sample(self, batch_size): #to sample, batch size needs to be bigger than memory -- this is important at the beginning
return len(self.memory) >= batch_size
class EpsilonGreedyStrategy(): #explor vs. exploitation
def __init__(self, start, end, decay):
self.start = start
self.end = end
self.decay = decay
def get_exploration_rate(self, current_step): ####this was not explained in the video, woher kommt
return self.end + self.start*(1/(1+self.decay*current_step))#self.end + (self.start - self.end) * \
#math.exp(-1. * current_step * self.decay)
class LearningRate():
def __init__(self, start, end, decay, current_step):
self.start = start
self.end = end
self.decay = decay
self.current_step = current_step
def get_learning_rate(self, current_step):
self.current_step += 1
return self.end + self.start*(1/(1+self.decay*current_step))
class lr(): # learning rate class needed. Left for possible future use, need to update things beforehand
def __init__(self, learning_rate):
self.current_step = 0
self.learning_rate = learning_rate
def update_lr(self):
lrrate = learning_rate.get_learning_rate(self.current_step)
self.current_step +=1
class Agent():
def __init__(self, strategy, num_actions, device): # when we later create an agent object, need to get strategy from epsilon, num_actions = how many actions from a given state (2 for this game), device is the device in pytorch for tensor calculations CPU or GPU
self.current_step = 0 # current step number in the environment
self.strategy = strategy
self.num_actions = num_actions
self.device = device
def select_action(self, state, policy_net): #policy_net is the policy trained by DQN
rate = strategy.get_exploration_rate(self.current_step)
self.current_step += 1
if rate > random.random():
action = random.randrange(self.num_actions)
return torch.tensor([action]).to(self.device) # explore
else:
with torch.no_grad(): #turn off gradient tracking
return policy_net(state).argmax(dim=1).to(self.device) # exploit
class CartPoleEnvManager(): #manage cartpole env
def __init__(self, device):
self.device = device
self.env = gym.make('CartPole-v0').unwrapped #unwrapped so we can see other dynamics that there is no access to otherwise
self.env.reset() #reset to starting state
self.current_screen = None #screen initialization
self.done = False #if episode is finished
def reset(self):
self.env.reset()
self.current_screen = None
def close(self): #close env
self.env.close()
def render(self, mode='human'): #render the current state to screen
return self.env.render(mode)
def num_actions_available(self): #returns # actions available to agent (2)
return self.env.action_space.n
def take_action(self, action):# step returns tuple containing env observation, reward and diagnostic info -- all from taking a certain action
_, reward, self.done, _ = self.env.step(action.item()) # only reward and done status are of importance
return torch.tensor([reward], device=self.device)
#####action is a tensor, action.item() gives a number, what step wants
def just_starting(self):
return self.current_screen is None
def get_state(self): #return to the current state of env in the form of a processed image of the screen
if self.just_starting() or self.done:
self.current_screen = self.get_processed_screen() #state = processed image of diff of 2 separate screens
black_screen = torch.zeros_like(self.current_screen)
return black_screen
else:
s1 = self.current_screen
s2 = self.get_processed_screen() ####what is get_processed_screen?
self.current_screen = s2
return s2 - s1 # this represents a single state
def get_screen_height(self):
screen = self.get_processed_screen()
return screen.shape[2]
def get_screen_width(self):
screen = self.get_processed_screen()
return screen.shape[3]
def get_processed_screen(self):
screen = self.env.render(mode='rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
screen = self.crop_screen(screen)
return self.transform_screen_data(screen)
def crop_screen(self, screen):
screen_height = screen.shape[1]
# Strip off top and bottom
top = int(screen_height * 0.4)
bottom = int(screen_height * 0.8)
screen = screen[:, top:bottom, :]
return screen
def transform_screen_data(self, screen):
# Convert to float, rescale, convert to tensor
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
# Use torchvision package to compose image transforms
resize = T.Compose([
T.ToPILImage()
,T.Resize((40,90))
,T.ToTensor()
])
return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)
def plot(values, moving_avg_period):
plt.figure(2)
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(values)
moving_avg = get_moving_average(moving_avg_period, values)
plt.plot(moving_avg)
plt.pause(0.001)
print("Episode", len(values), "\n", \
moving_avg_period, "episode moving avg:", moving_avg[-1])
if is_ipython: display.clear_output(wait=True)
def get_moving_average(period, values):
values = torch.tensor(values, dtype=torch.float)
if len(values) >= period:
moving_avg = values.unfold(dimension=0, size=period, step=1) \
.mean(dim=1).flatten(start_dim=0)
moving_avg = torch.cat((torch.zeros(period-1), moving_avg))
return moving_avg.numpy()
else:
moving_avg = torch.zeros(len(values))
return moving_avg.numpy()
def extract_tensors(experiences):
# Convert batch of Experiences to Experience of batches
batch = Experience(*zip(*experiences))
t1 = torch.cat(batch.state)
t2 = torch.cat(batch.action)
t3 = torch.cat(batch.reward)
t4 = torch.cat(batch.next_state)
return (t1,t2,t3,t4)
class QValues():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
@staticmethod
def get_current(policy_net, states, actions):
return policy_net(states).gather(dim=1, index=actions.unsqueeze(-1))
@staticmethod
def get_next(target_net, next_states):
final_state_locations = next_states.flatten(start_dim=1) \
.max(dim=1)[0].eq(0).type(torch.bool)
non_final_state_locations = (final_state_locations == False)
non_final_states = next_states[non_final_state_locations]
batch_size = next_states.shape[0]
values = torch.zeros(batch_size).to(QValues.device)
values[non_final_state_locations] = target_net(non_final_states).max(dim=1)[0].detach()
return values
batch_size = 128
gamma = 0.999
eps_start = 1
eps_end = 0.01
eps_decay = 0.0005
target_update = 10
memory_size = 500000
lr_start = 0.01
lr_end = 0.00001
lr_decay = 0.00009
num_episodes = 1000 # run for more episodes for better results
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = CartPoleEnvManager(device)
strategy = EpsilonGreedyStrategy(eps_start, eps_end, eps_decay)
agent = Agent(strategy, em.num_actions_available(), device)
memory = ReplayMemory(memory_size)
policy_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval() #tells pytorch that target_net is only used for inference, not training
optimizer = optim.Adam(params=policy_net.parameters(), lr=0.01)
i = 0
episode_durations = []
for episode in range(num_episodes): #iterate over each episode
em.reset()
state = em.get_state()
for timestep in count():
action = agent.select_action(state, policy_net)
reward = em.take_action(action)
next_state = em.get_state()
memory.push(Experience(state, action, next_state, reward))
state = next_state
i = 0
if memory.can_provide_sample(batch_size):
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.9)
experiences = memory.sample(batch_size)
states, actions, rewards, next_states = extract_tensors(experiences)
current_q_values = QValues.get_current(policy_net, states, actions)
next_q_values = QValues.get_next(target_net, next_states) #will get the max qvalues of the next state, q values of next state are used via next state
target_q_values = (next_q_values * gamma) + rewards
loss = F.mse_loss(current_q_values, target_q_values.unsqueeze(1))
optimizer.zero_grad() # sets the gradiesnt of all weights n biases in policy_net to zero
loss.backward() #computes gradient of loss with respect to all weights n biases in the policy net
optimizer.step() # updates the weights n biases with the gradients that were computed form loss.backwards
scheduler.step()
if em.done:
episode_durations.append(timestep)
plot(episode_durations, 100)
break
if episode % target_update == 0:
target_net.load_state_dict(policy_net.state_dict())
em.close()
您正在使用 "hacked"(如果您愿意,也可以修补)版本的 CartPole 环境,它实际上用渲染图像替换了真实状态 CartPole-v0
returns。因此,您的代码正在尝试训练一种将图像作为输入而不是 4 值特征数组原始 CartPole-v0
正在返回的策略。
如果你打电话的时候仔细看
_, reward, self.done, _ = self.env.step(action.item())
第一个元素_
是原始CartPole-v0
环境的实际状态
然后,您所拥有的 class 不会使用它进行渲染并返回图像作为训练的输入。
因此,对于现有任务(有效状态是图像),您不能真正跳过渲染,因为它是为策略准备输入的一部分。