如何从 Pytorch 中的 OpenAI 中删除 Cartpole 游戏中的显示效果图

How to remove showing renderings in the Cartpole game from OpenAI in Pytorch

我当前的代码基于他们网站上的 Pytorch 示例,他们使用 env.render() 来创建下一个状态。这使得游戏 运行 非常慢,并且希望在没有渲染的情况下 运行 快得多。这是使用渲染函数的 class,底部是完整代码。我有什么想法可以让它发挥作用吗?

class CartPoleEnvManager(): #manage cartpole env
    def __init__(self, device):
        self.device = device
        self.env = gym.make('CartPole-v0').unwrapped #unwrapped so we can see other dynamics that there is no access to otherwise
        self.env.reset() #reset to starting state
        self.current_screen = None #screen initialization
        self.done = False #if episode is finished

    def reset(self):
        self.env.reset()
        self.current_screen = None

    def close(self): #close env
        self.env.close()

    def render(self, mode='human'): #render the current state to screen
        return self.env.render(mode)

    def num_actions_available(self): #returns # actions available to agent (2)
        return self.env.action_space.n

    def take_action(self, action):# step returns tuple containing env observation, reward and diagnostic info -- all from taking a certain action
        _, reward, self.done, _ = self.env.step(action.item()) # only reward and done status are of importance
        return torch.tensor([reward], device=self.device)
    #####action is a tensor, action.item() gives a number, what step wants

    def just_starting(self):
        return self.current_screen is None

    def get_state(self): #return to the current state of env in the form of a processed image of the screen
        if self.just_starting() or self.done:
            self.current_screen = self.get_processed_screen() #state = processed image of diff of 2 separate screens
            black_screen = torch.zeros_like(self.current_screen)
            return black_screen
        else:
            s1 = self.current_screen
            s2 = self.get_processed_screen() ####what is get_processed_screen? 
            self.current_screen = s2
            return s2 - s1 # this represents a single state

    def get_screen_height(self):
        screen = self.get_processed_screen()
        return screen.shape[2]

    def get_screen_width(self):
        screen = self.get_processed_screen()
        return screen.shape[3]

    def get_processed_screen(self):
        screen = self.env.render(mode='rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
        screen = self.crop_screen(screen)
        return self.transform_screen_data(screen)

    def crop_screen(self, screen):
        screen_height = screen.shape[1]

        # Strip off top and bottom
        top = int(screen_height * 0.4)
        bottom = int(screen_height * 0.8)
        screen = screen[:, top:bottom, :]
        return screen

    def transform_screen_data(self, screen):       
        # Convert to float, rescale, convert to tensor
        screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
        screen = torch.from_numpy(screen)

        # Use torchvision package to compose image transforms
        resize = T.Compose([
            T.ToPILImage()
            ,T.Resize((40,90))
            ,T.ToTensor()
        ])


        return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)

完整代码:

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nnfr
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import pennylane as qml
import numpy as np
import torch
import torch.nn as nn
from torch.nn.functional import relu, sigmoid



class DQN(nn.Module):
    def __init__(self, img_height, img_width):
        super().__init__()

        self.fc1 = nn.Linear(in_features=img_height*img_width*3, out_features=64)   
        self.fc2 = nn.Linear(in_features=64, out_features=48)
        self.fc3 = nn.Linear(in_features=48, out_features=32)
        self.out = nn.Linear(in_features=32, out_features=2)        



    def forward(self, b):
        b = b.flatten(start_dim=1)
        #t = F.relu(clayer_out)
        b = F.relu(self.fc1(b))
        b = F.relu(self.fc2(b))
        b = F.relu(self.fc3(b))
        b = self.out(b)
        return b



is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display

Experience = namedtuple(
    'Experience',
    ('state', 'action', 'next_state', 'reward')
) # use experiences to train network



class ReplayMemory():
    def __init__(self, capacity): # replay memory has set capacity, only need to
        self.capacity = capacity # initialize capacity for a ReplayMemory object
        self.memory = [] #memory will hold the experiences
        self.push_count = 0 #to show how many experiences we've added to memory

    def push(self, experience): # to add experiences in memory
        if len(self.memory) < self.capacity: # ensuring that the length of memory doesn't exceed set capacity
            self.memory.append(experience) 
        else:
            self.memory[self.push_count % self.capacity] = experience # if memory greater than capacity, 
         # then the new experiences will be put to the front of memory, erasing the 
                            # oldest experiences in the memory array
        self.push_count += 1 
    def sample(self, batch_size): # returns a randome sample of experiences, to later use for 
        return random.sample(self.memory, batch_size) #random.sample(sequence, k), of the sequence, gives k randomly chosen experiences

    def can_provide_sample(self, batch_size): #to sample, batch size needs to be bigger than memory -- this is important at the beginning
        return len(self.memory) >= batch_size


class EpsilonGreedyStrategy(): #explor vs. exploitation
    def __init__(self, start, end, decay):
        self.start = start
        self.end = end
        self.decay = decay

    def get_exploration_rate(self, current_step):  ####this was not explained in the video, woher kommt
        return self.end + self.start*(1/(1+self.decay*current_step))#self.end + (self.start - self.end) * \
            #math.exp(-1. * current_step * self.decay)


class LearningRate():
    def __init__(self, start, end, decay, current_step):
        self.start = start
        self.end = end
        self.decay = decay
        self.current_step = current_step
    def get_learning_rate(self, current_step):
        self.current_step += 1
        return self.end + self.start*(1/(1+self.decay*current_step))


class lr(): # learning rate class needed. Left for possible future use, need to update things beforehand
    def __init__(self, learning_rate):
        self.current_step = 0
        self.learning_rate = learning_rate
    def update_lr(self):    
        lrrate = learning_rate.get_learning_rate(self.current_step)
        self.current_step +=1
class Agent():
    def __init__(self, strategy, num_actions, device): # when we later create an agent object, need to get strategy from epsilon, num_actions = how many actions from a given state (2 for this game), device is the device in pytorch for tensor calculations CPU or GPU
        self.current_step = 0 # current step number in the environment
        self.strategy = strategy
        self.num_actions = num_actions
        self.device = device

    def select_action(self, state, policy_net): #policy_net is the policy trained by DQN
        rate = strategy.get_exploration_rate(self.current_step)
        self.current_step += 1

        if rate > random.random():
            action = random.randrange(self.num_actions)
            return torch.tensor([action]).to(self.device) # explore      
        else:
            with torch.no_grad(): #turn off gradient tracking
                return policy_net(state).argmax(dim=1).to(self.device) # exploit



class CartPoleEnvManager(): #manage cartpole env
    def __init__(self, device):
        self.device = device
        self.env = gym.make('CartPole-v0').unwrapped #unwrapped so we can see other dynamics that there is no access to otherwise
        self.env.reset() #reset to starting state
        self.current_screen = None #screen initialization
        self.done = False #if episode is finished

    def reset(self):
        self.env.reset()
        self.current_screen = None

    def close(self): #close env
        self.env.close()

    def render(self, mode='human'): #render the current state to screen
        return self.env.render(mode)

    def num_actions_available(self): #returns # actions available to agent (2)
        return self.env.action_space.n

    def take_action(self, action):# step returns tuple containing env observation, reward and diagnostic info -- all from taking a certain action
        _, reward, self.done, _ = self.env.step(action.item()) # only reward and done status are of importance
        return torch.tensor([reward], device=self.device)
    #####action is a tensor, action.item() gives a number, what step wants

    def just_starting(self):
        return self.current_screen is None

    def get_state(self): #return to the current state of env in the form of a processed image of the screen
        if self.just_starting() or self.done:
            self.current_screen = self.get_processed_screen() #state = processed image of diff of 2 separate screens
            black_screen = torch.zeros_like(self.current_screen)
            return black_screen
        else:
            s1 = self.current_screen
            s2 = self.get_processed_screen() ####what is get_processed_screen? 
            self.current_screen = s2
            return s2 - s1 # this represents a single state

    def get_screen_height(self):
        screen = self.get_processed_screen()
        return screen.shape[2]

    def get_screen_width(self):
        screen = self.get_processed_screen()
        return screen.shape[3]

    def get_processed_screen(self):
        screen = self.env.render(mode='rgb_array').transpose((2, 0, 1)) # PyTorch expects CHW
        screen = self.crop_screen(screen)
        return self.transform_screen_data(screen)

    def crop_screen(self, screen):
        screen_height = screen.shape[1]

        # Strip off top and bottom
        top = int(screen_height * 0.4)
        bottom = int(screen_height * 0.8)
        screen = screen[:, top:bottom, :]
        return screen

    def transform_screen_data(self, screen):       
        # Convert to float, rescale, convert to tensor
        screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
        screen = torch.from_numpy(screen)

        # Use torchvision package to compose image transforms
        resize = T.Compose([
            T.ToPILImage()
            ,T.Resize((40,90))
            ,T.ToTensor()
        ])


        return resize(screen).unsqueeze(0).to(self.device) # add a batch dimension (BCHW)


def plot(values, moving_avg_period):
    plt.figure(2)
    plt.clf()        
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(values)

    moving_avg = get_moving_average(moving_avg_period, values)
    plt.plot(moving_avg)    
    plt.pause(0.001)
    print("Episode", len(values), "\n", \
          moving_avg_period, "episode moving avg:", moving_avg[-1])
    if is_ipython: display.clear_output(wait=True)

def get_moving_average(period, values):
    values = torch.tensor(values, dtype=torch.float)
    if len(values) >= period:
        moving_avg = values.unfold(dimension=0, size=period, step=1) \
            .mean(dim=1).flatten(start_dim=0)
        moving_avg = torch.cat((torch.zeros(period-1), moving_avg))
        return moving_avg.numpy()
    else:
        moving_avg = torch.zeros(len(values))
        return moving_avg.numpy()


def extract_tensors(experiences):
    # Convert batch of Experiences to Experience of batches
    batch = Experience(*zip(*experiences))

    t1 = torch.cat(batch.state)
    t2 = torch.cat(batch.action)
    t3 = torch.cat(batch.reward)
    t4 = torch.cat(batch.next_state)

    return (t1,t2,t3,t4)


class QValues():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    @staticmethod
    def get_current(policy_net, states, actions):
        return policy_net(states).gather(dim=1, index=actions.unsqueeze(-1))

    @staticmethod        
    def get_next(target_net, next_states):                
        final_state_locations = next_states.flatten(start_dim=1) \
            .max(dim=1)[0].eq(0).type(torch.bool)
        non_final_state_locations = (final_state_locations == False)
        non_final_states = next_states[non_final_state_locations]
        batch_size = next_states.shape[0]
        values = torch.zeros(batch_size).to(QValues.device)
        values[non_final_state_locations] = target_net(non_final_states).max(dim=1)[0].detach()
        return values


batch_size = 128
gamma = 0.999
eps_start = 1
eps_end = 0.01
eps_decay = 0.0005
target_update = 10
memory_size = 500000
lr_start = 0.01
lr_end = 0.00001
lr_decay = 0.00009
num_episodes = 1000 # run for more episodes for better results

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = CartPoleEnvManager(device)
strategy = EpsilonGreedyStrategy(eps_start, eps_end, eps_decay)
agent = Agent(strategy, em.num_actions_available(), device)
memory = ReplayMemory(memory_size)


policy_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net.load_state_dict(policy_net.state_dict())

target_net.eval() #tells pytorch that target_net is only used for inference, not training
optimizer = optim.Adam(params=policy_net.parameters(), lr=0.01)

i = 0
episode_durations = []
for episode in range(num_episodes): #iterate over each episode
    em.reset()
    state = em.get_state()

    for timestep in count():
        action = agent.select_action(state, policy_net)
        reward = em.take_action(action)
        next_state = em.get_state()
        memory.push(Experience(state, action, next_state, reward))
        state = next_state
        i = 0
        if memory.can_provide_sample(batch_size):
            scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.9)
            experiences = memory.sample(batch_size)
            states, actions, rewards, next_states = extract_tensors(experiences)

            current_q_values = QValues.get_current(policy_net, states, actions)
            next_q_values = QValues.get_next(target_net, next_states) #will get the max qvalues of the next state, q values of next state are used via next state
            target_q_values = (next_q_values * gamma) + rewards

            loss = F.mse_loss(current_q_values, target_q_values.unsqueeze(1))
            optimizer.zero_grad() # sets the gradiesnt of all weights n biases in policy_net to zero
            loss.backward() #computes gradient of loss with respect to all weights n biases in the policy net
            optimizer.step() # updates the weights n biases with the gradients that were computed form loss.backwards
            scheduler.step()

        if em.done:
            episode_durations.append(timestep)
            plot(episode_durations, 100)
            break
    if episode % target_update == 0:
        target_net.load_state_dict(policy_net.state_dict()) 


em.close()

您正在使用 "hacked"(如果您愿意,也可以修补)版本的 CartPole 环境,它实际上用渲染图像替换了真实状态 CartPole-v0 returns。因此,您的代码正在尝试训练一种将图像作为输入而不是 4 值特征数组原始 CartPole-v0 正在返回的策略。

如果你打电话的时候仔细看

 _, reward, self.done, _ = self.env.step(action.item())

第一个元素_是原始CartPole-v0环境的实际状态

然后,您所拥有的 class 不会使用它进行渲染并返回图像作为训练的输入。

因此,对于现有任务(有效状态是图像),您不能真正跳过渲染,因为它是为策略准备输入的一部分。