Keras 代理培训需要太多时间

Keras Agent Training Takes Too Much Time

我是强化学习的新手,我构建了一个代理,将两个输入提供给它的神经网络(第一个输入是一个元组,两个数字代表代理当前位置 | 第二个输入是一个数字数组范围从 0 到 3,表示代理从环境中接收的请求类型)并输出最佳运动(向前、向后、向侧面移动等...)

每一集有 300 步,train_pos_nn() 中的 for 循环需要 +5s(每次调用 predict() 大约需要 20ms,每次调用 fit() 大约需要 7ms),这相当于每集 +25 分钟,时间太多了。 (大约 17 天完成 1000 集,这是收敛所需的集数/在 Google Colab 上花费相同的时间((编辑:即使使用 GPU 选项并且 gpu 不能设置为在我的本地机器上使用))

有什么方法可以减少代理训练所需的时间?

n_possible_movements = 9
MINIBATCH_SIZE = 32

class DQNAgent(object):
    def __init__(self):
        #self.gamma = 0.95 
        self.epsilon = 1.0
        self.epsilon_decay = 0.8
        self.epsilon_min = 0.1
        self.learning_rate = 10e-4 
        self.tau = 1e-3
                        
        # Main models
        self.model_uav_pos = self._build_pos_model()

        # Target networks
        self.target_model_uav_pos = self._build_pos_model()
        # Copy weights
        self.target_model_uav_pos.set_weights(self.model_uav_pos.get_weights())

        # An array with last n steps for training
        self.replay_memory_pos_nn = deque(maxlen=REPLAY_MEMORY_SIZE)
        
    def _build_pos_model(self): # compile the DNN
        # create the DNN model
        dnn = self.create_pos_dnn()
        
        opt = Adam(learning_rate=self.learning_rate) #, decay=self.epsilon_decay)
        dnn.compile(loss="categorical_crossentropy", optimizer=opt, metrics=['accuracy'])
        
        return dnn
    
    def create_pos_dnn(self): 
        # initialize the input shape (The shape of an array is the number of elements in each dimension)
        pos_input_shape = (2,)
        requests_input_shape = (len(env.ues),)
        # How many possible outputs we can have
        output_nodes = n_possible_movements
        
        # Initialize the inputs
        uav_current_position = Input(shape=pos_input_shape, name='pos')
        ues_requests = Input(shape=requests_input_shape, name='requests')
        
        # Put them in a list
        list_inputs = [uav_current_position, ues_requests]
        
        # Merge all input features into a single large vector
        x = layers.concatenate(list_inputs)
        
        # Add a 1st Hidden (Dense) Layer
        dense_layer_1 = Dense(512, activation="relu")(x)
        
        # Add a 2nd Hidden (Dense) Layer
        dense_layer_2 = Dense(512, activation="relu")(dense_layer_1)
        
        # Add a 3rd Hidden (Dense) Layer
        dense_layer_3 = Dense(256, activation="relu")(dense_layer_2)
        
        # Output layer
        output_layer = Dense(output_nodes, activation="softmax")(dense_layer_3)

        model = Model(inputs=list_inputs, outputs=output_layer)
                        
        # return the DNN
        return model
    
    def remember_pos_nn(self, state, action, reward, next_state, done):
        self.replay_memory_pos_nn.append((state, action, reward, next_state, done)) 
        
    def act_upon_choosing_a_new_position(self, state): # state is a tuple (uav_position, requests_array)
        if np.random.rand() <= self.epsilon: # if acting randomly, take random action
            return random.randrange(n_possible_movements)
        pos =  np.array([state[0]])
        reqs =  np.array([state[1]])
        act_values = self.model_uav_pos.predict(x=[pos, reqs]) # if not acting randomly, predict reward value based on current state
        return np.argmax(act_values[0]) 
        
    def train_pos_nn(self):
        print("In Training..")

        # Start training only if certain number of samples is already saved
        if len(self.replay_memory_pos_nn) < MIN_REPLAY_MEMORY_SIZE:
            print("Exiting Training: Replay Memory Not Full Enough...")
            return

        # Get a minibatch of random samples from memory replay table
        minibatch = random.sample(self.replay_memory_pos_nn, MINIBATCH_SIZE)

        start_time = time.time()
        # Enumerate our batches
        for index, (current_state, action, reward, new_current_state, done) in enumerate(minibatch):
            print('...Starting Training...')
            target = 0
            pos =  np.array([current_state[0]])
            reqs =  np.array([current_state[1]])
            pos_next = np.array([new_current_state[0]])
            reqs_next = np.array([new_current_state[1]])
    
            if not done:
                target = reward + DISCOUNT * np.amax(self.target_model_uav_pos.predict(x=[pos_next, reqs_next]))
            else:
                target = reward

            # Update Q value for given state
            target_f = self.model_uav_pos.predict(x=[pos, reqs])
            target_f[0][action] = target

            self.model_uav_pos.fit([pos, reqs], \
                                   target_f, \
                                   verbose=2, \
                                   shuffle=False, \
                                   callbacks=None, \
                                   epochs=1 \
                                  )  
        end_time = time.time()
        print("Time", end_time - start_time)
        # Update target network counter every episode
        self.target_train()
        
    def target_train(self):
        weights = self.model_uav_pos.get_weights()
        target_weights = self.target_model_uav_pos.get_weights()
        for i in range(len(target_weights)):
            target_weights[i] = weights[i] * self.tau + target_weights[i] * (1 - self.tau)
        self.target_model_uav_pos.set_weights(target_weights)
# Main 
SIZE = 100 # size of the grid the agent is in
for episode in tqdm(range(1, n_episodes + 1), ascii=True, unit='episodes'):  
    # Reset environment and get initial state
    current_state = env.reset(SIZE)

    # Reset flag and start iterating until episode ends
    done = False
    steps_n = 300

    for t in range(steps_n): 
        # Normalize the input (the current state)
        current_state_normalized = normalize_pos_state(current_state)
        
        # Get new position for the agent
        action_pos = agent_dqn.act_upon_choosing_a_new_position(current_state_normalized)
        
        new_state, reward, done, _ = env.step(action_pos)
        
        agent_dqn.remember_pos_nn(current_state_normalized, action_pos, reward, normalize_pos_state(new_state), done)

        current_state = new_state # not normalized
        
        agent_dqn.train_pos_nn()

    # Decay epsilon
    if episode % 50 == 0:
        if agent_dqn.epsilon > agent_dqn.epsilon_min:
            agent_dqn.epsilon *= agent_dqn.epsilon_decay
            agent_dqn.epsilon = max(agent_dqn.epsilon, agent_dqn.epsilon_min)

使用 GPU(图形处理单元)总能使模型训练更快。您可以按照以下步骤在 GPU 上训练您的模型:

How to Finally Install TensorFlow 2 GPU on Windows 10 in 2022:

  • 第 1 步:找出 TF 版本及其驱动程序。
  • 第 2 步:安装 Microsoft Visual Studio
  • 第 3 步:安装 NVIDIA CUDA 工具包
  • 第 4 步:安装 cuDNN
  • 第 5 步:解压 ZIP 文件夹并复制核心目录
  • 第 6 步:将 CUDA 工具包添加到 PATH
  • 第 7 步:使用 Jupyter Lab 在虚拟环境中安装 TensorFlow

(详细说明在上面link)

但是,您可以使用 Google Colab,因为它有一个 GPU 选项,不需要您进行任何安装。您可以在 colab 设置中更改加速器:Runtime -> Change runtime type -> None/GPU/TPU.

训练循环中的一个性能优化是使用模型的 call 方法而不是调用 predict,并用 tf.function 包装它。 predict 适用于批量推理,但有一些开销,对于单个样本,call 可能会更快。可以找到有关此差异的更多详细信息 here。出于您的目的,修改方式可能是:

class DQNAgent(object):

    def _build_pos_model(self): # compile the DNN
        # create the DNN model
        dnn = self.create_pos_dnn()
        
        opt = Adam(learning_rate=self.learning_rate) #, decay=self.epsilon_decay)
        dnn.compile(loss="categorical_crossentropy", optimizer=opt, metrics=['accuracy'])
        dnn.call = tf.function(dnn.call)
        
        return dnn

然后将self.model_uav_pos.predict(..)self.target_model_uav_pos.predict(...)的每次调用分别改为self.model_uav_pos(...)self.target_model_uav_pos(...)

进一步的潜在优化可能是 JIT compile the TF function 购买供应 jit_compile=Truetf.function 包装器,例如;

dnn.call = tf.function(dnn.call, jit_compile=True)

更新

看起来使用 call 方法而不是 predict,将 call 方法包装在 tf.function 中,并使用 JIT 编译将性能提高了 2 倍(5 秒 - > 2s),这是一个明显的差异。对于进一步的优化,虽然我认为它们不会让你更进一步,而不是仅仅包装 call call 之后的其他计算也可以包装在 tf.function 中,所以他们都变成了一个可调用的 Tensorflow 图。例如:

        act_values = self.model_uav_pos(x=[pos, reqs]) 
        return np.argmax(act_values[0]) 

我们可以使用 tf.argmax,而不是在 call 之后调用 np.argmax,然后将两者都包装在 tf.function 中。所以修改后的实现可以是:

class DQNAgent(object):
    def __init__(self):
        #self.gamma = 0.95 
        self.epsilon = 1.0
        self.epsilon_decay = 0.8
        self.epsilon_min = 0.1
        self.learning_rate = 10e-4 
        self.tau = 1e-3
                        
        # Main models
        self.model_uav_pos = self._build_pos_model()
        self.pred_model_uav = tf.function(lambda x: tf.argmax(self.model_uav_pos(x)), jit_compile=True)

        # Target networks
        self.target_model_uav_pos = self._build_pos_model()
        # Copy weights
        self.target_model_uav_pos.set_weights(self.model_uav_pos.get_weights())
        self.pred_target_model_uav = tf.function(lambda x: tf.reduce_max(self.target_model_uav_pos(x)), jit_compile=True)

然后用定义的相应新预测方法替换最初提出的解决方案中替换的每个 call(例如,而不是 self.model_uav_pos(...) 调用 self.pred_model_uav_pos(...)),并删除 numpy 预测后的函数调用。请注意,在此实施中,dnn.call = tf.function(dnn.call) 已从 _build_pos_model 中删除,因为我们稍后将进行换行。

这种方法的好处是,通过 JIT 编译最终应用于结果的其他计算(argmax 和 max),可以通过融合操作对图形进行额外的优化。关于这个想法的一些额外细节,连同 softmax 的一个简单示例,可以在 here.

中找到

正如我所说,我不认为这会导致进一步的大幅改进,但它可能会减少循环中的一些额外时间。

更新 2

我将修改上次更新中的建议,因为我意识到调用 model_uav_pos 进行推理发生在两个地方 - 一次在 act_upon_choosing_a_new_position 之后是 argmax 一次在 train_pos_nn 中,仅使用输出。我建议在定义 self.pred_model_uav 之后用 tf.function 包装 model_uav_poscall 方法,这样两个推理函数都被编译成 Tensorflow 图:

class DQNAgent(object):
    def __init__(self):
        #self.gamma = 0.95 
        self.epsilon = 1.0
        self.epsilon_decay = 0.8
        self.epsilon_min = 0.1
        self.learning_rate = 10e-4 
        self.tau = 1e-3
                        
        # Main models
        self.model_uav_pos = self._build_pos_model()
        self.pred_model_uav = tf.function(lambda x: tf.argmax(self.model_uav_pos(x)), jit_compile=True)
        self.model_uav_pos.call = tf.function(self.model_uav_pos.call, jit_compile=True)

...

而在act_upon_choosing_a_new_position方法中,使用了self.pred_model_uav,而在train_pos_nn方法中,只需调用self.model_uav_pos,如原始解决方案中所述。