PyTorch 入门指南
学习 PyTorch
图像和视频
音频
后端
强化学习
在生产环境中部署 PyTorch 模型
Profiling PyTorch
代码变换与FX
前端API
扩展 PyTorch
模型优化
并行和分布式训练
边缘端的 ExecuTorch
推荐系统
多模态

强化学习(DQN)教程

作者: Adam Paszke

Mark Towers

本教程展示了如何使用 PyTorch 在 Gymnasium 的 CartPole-v1 任务上训练一个深度 Q 学习(DQN)智能体。

您可能会发现阅读原始的 深度 Q 学习(DQN) 论文很有帮助。

任务

智能体需要在两个动作之间做出选择——将小车向左或向右移动——以使连接在其上的杆保持直立。您可以在 Gymnasium 的网站 上找到有关该环境以及其他更具挑战性的环境的更多信息。

CartPole

CartPole

当智能体观察到环境的当前状态并选择一个动作时,环境会转换到一个新的状态,并返回一个奖励,该奖励表示该动作的后果。在这个任务中,每个增量时间步的奖励为+1,如果杆子倾斜过度或小车移动超过中心2.4个单位,环境将终止。这意味着表现更好的场景将运行更长时间,累积更大的回报。

CartPole任务的设计使得智能体的输入是4个表示环境状态的实数值(位置、速度等)。我们不对这4个输入进行任何缩放,并将它们传递给一个具有2个输出的小型全连接网络,每个输出对应一个动作。网络被训练用于预测给定输入状态下每个动作的期望值。然后选择具有最高期望值的动作。

首先,让我们导入所需的包。首先,我们需要gymnasium作为环境,可以通过pip安装。这是原始OpenAI Gym项目的一个分支,自Gym v0.19以来由同一团队维护。如果您在Google Colab中运行此代码,请运行:

%%bash
pip3installgymnasium[classic_control]

我们还将使用 PyTorch 中的以下内容:

  • 神经网络 (torch.nn)

  • 优化 (torch.optim)

  • 自动微分 (torch.autograd)

importgymnasiumasgym
importmath
importrandom
importmatplotlib
importmatplotlib.pyplotasplt
fromcollectionsimport namedtuple, deque
fromitertoolsimport count

importtorch
importtorch.nnasnn
importtorch.optimasoptim
importtorch.nn.functionalasF

env = gym.make("CartPole-v1")

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    fromIPythonimport display

plt.ion()

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

经验回放内存

我们将使用经验回放记忆来训练我们的 DQN。它存储了智能体观察到的状态转换,使我们能够在以后重复使用这些数据。通过随机从中采样,构建批次的转换数据将不再相关。研究表明,这极大地稳定并改进了 DQN 的训练过程。

为此,我们需要两个类:

  • Transition - 一个命名元组,表示环境中的单次转换。它本质上将(状态,动作)对映射到它们的(下一个状态,奖励)结果,其中状态是稍后描述的屏幕差异图像。

  • ReplayMemory - 一个有限大小的循环缓冲区,用于存储最近观察到的转换。它还实现了 .sample() 方法,用于随机选择一批转换进行训练。

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


classReplayMemory(object):

    def__init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    defpush(self, *args):
"""Save a transition"""
        self.memory.append(Transition(*args))

    defsample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def__len__(self):
        return len(self.memory)

现在,让我们来定义我们的模型。但在此之前,我们先快速回顾一下什么是 DQN。

DQN 算法

我们的环境是确定性的,因此为了简化起见,这里展示的所有方程也都是以确定性的方式表述的。在强化学习的文献中,这些方程通常还会包含对环境中的随机转移的期望。

我们的目标是训练一个策略,该策略试图最大化折现后的累积奖励 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也被称为回报。折现因子 \(\gamma\) 应该是一个介于 \(0\) 和 \(1\) 之间的常数,以确保求和收敛。较低的 \(\gamma\) 使得来自不确定的遥远未来的奖励对我们的智能体来说不如它可以较为确定的近期奖励重要。它还鼓励智能体收集时间上更接近的奖励,而不是那些在时间上遥远的等效奖励。

Q-learning 背后的主要思想是,如果我们有一个函数 \(Q^*: State \times Action \rightarrow \mathbb{R}\),它可以告诉我们如果在给定状态下采取某个动作会得到什么回报,那么我们就能轻松构建一个最大化奖励的策略:

\[\pi^*(s) = \arg\!\max_a \ Q^*(s, a) \]

然而,我们并不了解世界的所有信息,因此我们无法直接获得 \(Q^*\)。但是,由于神经网络是通用函数逼近器,我们可以简单地创建一个神经网络并训练它,使其近似于 \(Q^*\)。

对于我们的训练更新规则,我们将利用一个事实:任何策略的 \(Q\) 函数都遵循贝尔曼方程:

\[Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s')) \]

等式两边的差值被称为时间差分误差(Temporal Difference Error),记作 \(\delta\):

\[\delta = Q(s, a) - (r + \gamma \max_a' Q(s', a)) \]

为了最小化这个误差,我们将使用 Huber 损失。Huber 损失在误差较小时表现类似于均方误差,但在误差较大时表现类似于平均绝对误差——这使得它在 \(Q\) 的估计非常嘈杂时对异常值更加鲁棒。我们通过对从回放记忆中采样的一批转换 \(B\) 来计算这个损失:

\[\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)\]

[\text{其中} \quad \mathcal{L}(\delta) = \begin{cases} \frac{1}{2}{\delta^2} & \text{当 } |\delta| \le 1, \\ |\delta| - \frac{1}{2} & \text{其他情况.} \end{cases}]

Q网络

我们的模型将是一个前馈神经网络,它接收当前屏幕图像与前一屏幕图像之间的差异作为输入。该模型有两个输出,分别表示 \(Q(s, \mathrm{left})\) 和 \(Q(s, \mathrm{right})\)(其中 \(s\) 是网络的输入)。实际上,该网络试图预测在当前输入下采取每个动作的期望回报

classDQN(nn.Module):

    def__init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    defforward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

训练

超参数和工具

该单元格实例化了我们的模型及其优化器,并定义了一些实用工具:

  • select_action - 将根据 ε-greedy 策略选择一个动作。简单来说,我们有时会使用模型来选择动作,有时会随机均匀采样一个动作。选择随机动作的概率将从 EPS_START 开始,并以指数方式衰减至 EPS_ENDEPS_DECAY 控制衰减速率。

  • plot_durations - 一个辅助函数,用于绘制每个 episode 的持续时间,并计算最近 100 个 episode 的平均值(这是官方评估中使用的指标)。该图表将显示在主训练循环所在单元格的下方,并在每个 episode 结束后更新。

# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


defselect_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


defplot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

训练循环

最后,这是我们训练模型的代码。

在这里,您可以找到一个 optimize_model 函数,它执行单步优化。该函数首先采样一个批次,将所有张量连接成一个,计算 \(Q(s_t, a_t)\) 和 \(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),然后将它们组合成我们的损失函数。根据定义,如果 \(s\) 是终止状态,则我们将 \(V(s) = 0\)。为了增加稳定性,我们还使用了一个目标网络来计算 \(V(s_{t+1})\)。目标网络在每一步通过由超参数 TAU 控制的软更新进行更新,该超参数之前已定义。

defoptimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

下面,您可以看到主要的训练循环。开始时,我们重置环境并获取初始的 state Tensor。然后,我们采样一个动作,执行它,观察下一个状态和奖励(始终为 1),并对模型进行一次优化。当一集结束时(模型失败),我们重新启动循环。

如果 GPU 可用,num_episodes 被设置为 600,否则计划进行 50 集训练,以确保训练不会耗时过长。然而,50 集不足以在 CartPole 上观察到良好的性能。在 600 次训练集内,您应该看到模型持续达到 500 步。训练 RL 模型可能是一个不稳定的过程,因此如果未观察到收敛,重新启动训练可能会产生更好的结果。

if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

Result

/usr/local/lib/python3.10/dist-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:

`np.bool8` is a deprecated alias for `np.bool_`.  (Deprecated NumPy 1.24)

Complete

下图展示了整体数据流的示意图。

../_images/reinforcement_learning_diagram.jpg

动作要么随机选择,要么根据策略选择,从 gym 环境中获取下一步的样本。我们将结果记录在回放内存中,并在每次迭代时运行优化步骤。优化从回放内存中随机选取一个批次来训练新策略。在优化过程中,还会使用“旧”的 target_net 来计算预期的 Q 值。每一步都会对其权重进行软更新。

本页目录