训练一个玩马里奥的强化学习智能体
作者: Yuansong Feng, Suraj Subramanian, Howard Wang, Steven Guo.
本教程将带您深入了解深度强化学习的基础知识。最后,您将实现一个由AI驱动的马里奥(使用Double Deep Q-Networks),它可以自己玩游戏。
虽然本教程不需要具备强化学习的先验知识,但您可以先熟悉这些RL概念,并准备这份方便的速查表作为参考。完整代码可在此处获取。
%%bash
pipinstallgym-super-mario-bros==7.4.0
pipinstalltensordict==0.3.0
pipinstalltorchrl==0.3.0
importtorch
fromtorchimport nn
fromtorchvisionimport transforms as T
fromPILimport Image
importnumpyasnp
frompathlibimport Path
fromcollectionsimport deque
importrandom,datetime,os
# Gym is an OpenAI toolkit for RL
importgym
fromgym.spacesimport Box
fromgym.wrappersimport FrameStack
# NES Emulator for OpenAI Gym
fromnes_py.wrappersimport JoypadSpace
# Super Mario environment for OpenAI Gym
importgym_super_mario_bros
fromtensordictimport TensorDict
fromtorchrl.dataimport TensorDictReplayBuffer, LazyMemmapStorage
强化学习定义
环境 智能体与之交互并从中学习的世界。
动作 \(a\):智能体对环境做出的响应。所有可能的动作集合称为动作空间。
状态 \(s\):环境的当前特征。环境可能处于的所有可能状态的集合称为状态空间。
奖励 \(r\):奖励是环境向智能体提供的关键反馈。它是驱动智能体学习并改变其未来动作的关键。在多个时间步上奖励的累积称为回报。
最优动作-价值函数 \(Q^*(s,a)\):给出了从状态 \(s\) 开始,采取任意动作 \(a\),然后在每个未来时间步采取最大化回报的动作的期望回报。\(Q\) 可以被视为在某个状态下动作的“质量”。我们尝试逼近这个函数。
环境
初始化环境
在超级马里奥中,环境由管道、蘑菇和其他元素组成。
当马里奥执行一个动作时,环境会以变化后的(下一个)状态、奖励以及其他信息作为响应。
# Initialize Super Mario environment (in v0.26 change render mode to 'human' to see results on the screen)
if gym.__version__ < '0.26':
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", new_step_api=True)
else:
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", render_mode='rgb', apply_api_compatibility=True)
# Limit the action-space to
# 0. walk right
# 1. jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])
env.reset()
next_state, reward, done, trunc, info = env.step(action=0)
print(f"{next_state.shape},\n{reward},\n{done},\n{info}")
/usr/local/lib/python3.10/dist-packages/gym/envs/registration.py:555: UserWarning:
WARN: The environment SuperMarioBros-1-1-v0 is out of date. You should consider upgrading to version `v3`.
/usr/local/lib/python3.10/dist-packages/gym/envs/registration.py:627: UserWarning:
WARN: The environment creator metadata doesn't include `render_modes`, contains: ['render.modes', 'video.frames_per_second']
/usr/local/lib/python3.10/dist-packages/gym/utils/passive_env_checker.py:233: DeprecationWarning:
`np.bool8` is a deprecated alias for `np.bool_`. (Deprecated NumPy 1.24)
(240, 256, 3),
0.0,
False,
{'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'y_pos': 79}
环境预处理
环境数据会在 next_state
中返回给代理。正如您在上面所看到的,每个状态由一个大小为 [3, 240, 256]
的数组表示。通常,这些信息比我们的代理需要的更多;例如,马里奥的动作并不依赖于管道或天空的颜色!
我们使用**包装器(Wrappers)**在将环境数据发送给代理之前进行预处理。
GrayScaleObservation
是一个常用的包装器,用于将 RGB 图像转换为灰度图像;这样做可以减少状态表示的大小,而不会丢失有用的信息。现在每个状态的大小为:[1, 240, 256]
ResizeObservation
将每个观测值下采样为一个方形图像。新的大小为:[1, 84, 84]
SkipFrame
是一个自定义的包装器,继承自 gym.Wrapper
并实现了 step()
函数。由于连续帧之间的变化不大,我们可以跳过 n 个中间帧而不会丢失太多信息。第 n 帧会累积每个跳过的帧所获得的奖励。
FrameStack
是一个包装器,它允许我们将环境的连续帧压缩为单个观测点,以供学习模型使用。通过这种方式,我们可以根据马里奥在前几帧中的移动方向来判断他是在着陆还是跳跃。
classSkipFrame(gym.Wrapper):
def__init__(self, env, skip):
"""Return only every `skip`-th frame"""
super().__init__(env)
self._skip = skip
defstep(self, action):
"""Repeat action, and sum reward"""
total_reward = 0.0
for i in range(self._skip):
# Accumulate reward and repeat the same action
obs, reward, done, trunk, info = self.env.step(action)
total_reward += reward
if done:
break
return obs, total_reward, done, trunk, info
classGrayScaleObservation(gym.ObservationWrapper):
def__init__(self, env):
super().__init__(env)
obs_shape = self.observation_space.shape[:2]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
defpermute_orientation(self, observation):
# permute [H, W, C] array to [C, H, W] tensor
observation = np.transpose(observation, (2, 0, 1))
observation = torch.tensor(observation.copy(), dtype=torch.float)
return observation
defobservation(self, observation):
observation = self.permute_orientation(observation)
transform = T.Grayscale()
observation = transform(observation)
return observation
classResizeObservation(gym.ObservationWrapper):
def__init__(self, env, shape):
super().__init__(env)
if isinstance(shape, int):
self.shape = (shape, shape)
else:
self.shape = tuple(shape)
obs_shape = self.shape + self.observation_space.shape[2:]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
defobservation(self, observation):
transforms = T.Compose(
[T.Resize(self.shape, antialias=True), T.Normalize(0, 255)]
)
observation = transforms(observation).squeeze(0)
return observation
# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
if gym.__version__ < '0.26':
env = FrameStack(env, num_stack=4, new_step_api=True)
else:
env = FrameStack(env, num_stack=4)
在对环境应用了上述包装器后,最终的包装状态由4个灰度化的连续帧堆叠在一起组成,如上图左侧所示。每次马里奥执行一个动作,环境都会以这种结构的状态做出响应。该结构由一个大小为[4, 84, 84]
的3维数组表示。
代理
我们创建一个 Mario
类来表示游戏中的角色。Mario 应该能够:
-
行动:根据当前环境状态的最优行动策略执行操作。
-
记忆经验。经验 = (当前状态, 当前行动, 奖励, 下一个状态)。Mario 缓存并在之后回忆他的经验,以更新他的行动策略。
-
学习:随着时间的推移,学习更好的行动策略。
classMario:
def__init__():
pass
defact(self, state):
"""Given a state, choose an epsilon-greedy action"""
pass
defcache(self, experience):
"""Add the experience to memory"""
pass
defrecall(self):
"""Sample experiences from memory"""
pass
deflearn(self):
"""Update online action value (Q) function with a batch of experiences"""
pass
在接下来的部分中,我们将填充 Mario 的参数并定义他的函数。
行动
对于任何给定的状态,代理可以选择执行最优操作(利用)或随机操作(探索)。\ Mario 以 self.exploration_rate
的概率随机探索;当他选择利用时,他会依赖 MarioNet
(在 Learn
部分实现)来提供最优操作。
classMario:
def__init__(self, state_dim, action_dim, save_dir):
self.state_dim = state_dim
self.action_dim = action_dim
self.save_dir = save_dir
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Mario's DNN to predict the most optimal action - we implement this in the Learn section
self.net = MarioNet(self.state_dim, self.action_dim).float()
self.net = self.net.to(device=self.device)
self.exploration_rate = 1
self.exploration_rate_decay = 0.99999975
self.exploration_rate_min = 0.1
self.curr_step = 0
self.save_every = 5e5 # no. of experiences between saving Mario Net
defact(self, state):
"""
Given a state, choose an epsilon-greedy action and update value of step.
Inputs:
state(``LazyFrame``): A single observation of the current state, dimension is (state_dim)
Outputs:
``action_idx`` (``int``): An integer representing which action Mario will perform
"""
# EXPLORE
if np.random.rand() < self.exploration_rate:
action_idx = np.random.randint(self.action_dim)
# EXPLOIT
else:
state = state[0].__array__() if isinstance(state, tuple) else state.__array__()
state = torch.tensor(state, device=self.device).unsqueeze(0)
action_values = self.net(state, model="online")
action_idx = torch.argmax(action_values, axis=1).item()
# decrease exploration_rate
self.exploration_rate *= self.exploration_rate_decay
self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
# increment step
self.curr_step += 1
return action_idx
缓存与回忆
这两个函数相当于 Mario 的“记忆”过程。
cache()
: 每次 Mario 执行一个动作时,他都会将 experience
存储到记忆中。他的经验包括当前的状态、执行的动作、动作的奖励、下一个状态,以及游戏是否结束。
recall()
: Mario 从记忆中随机抽取一批经验,并利用这些经验来学习游戏。
classMario(Mario): # subclassing for continuity
def__init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.memory = TensorDictReplayBuffer(storage=LazyMemmapStorage(100000, device=torch.device("cpu")))
self.batch_size = 32
defcache(self, state, next_state, action, reward, done):
"""
Store the experience to self.memory (replay buffer)
Inputs:
state (``LazyFrame``),
next_state (``LazyFrame``),
action (``int``),
reward (``float``),
done(``bool``))
"""
deffirst_if_tuple(x):
return x[0] if isinstance(x, tuple) else x
state = first_if_tuple(state).__array__()
next_state = first_if_tuple(next_state).__array__()
state = torch.tensor(state)
next_state = torch.tensor(next_state)
action = torch.tensor([action])
reward = torch.tensor([reward])
done = torch.tensor([done])
# self.memory.append((state, next_state, action, reward, done,))
self.memory.add(TensorDict({"state": state, "next_state": next_state, "action": action, "reward": reward, "done": done}, batch_size=[]))
defrecall(self):
"""
Retrieve a batch of experiences from memory
"""
batch = self.memory.sample(self.batch_size).to(self.device)
state, next_state, action, reward, done = (batch.get(key) for key in ("state", "next_state", "action", "reward", "done"))
return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()
学习
Mario 在底层使用了 DDQN 算法。DDQN 使用了两个卷积网络 - \(Q_{online}\) 和 \(Q_{target}\) - 它们独立地逼近最优动作值函数。
在我们的实现中,我们在 \(Q_{online}\) 和 \(Q_{target}\) 之间共享特征生成器 features
,但为每个网络维护了独立的全连接分类器。为了防止通过反向传播更新,\(\theta_{target}\)(即 \(Q_{target}\) 的参数)被冻结。相反,它会定期与 \(\theta_{online}\) 同步(稍后会详细介绍)。
神经网络
classMarioNet(nn.Module):
"""mini CNN structure
input -> (conv2d + relu) x 3 -> flatten -> (dense + relu) x 2 -> output
"""
def__init__(self, input_dim, output_dim):
super().__init__()
c, h, w = input_dim
if h != 84:
raise ValueError(f"Expecting input height: 84, got: {h}")
if w != 84:
raise ValueError(f"Expecting input width: 84, got: {w}")
self.online = self.__build_cnn(c, output_dim)
self.target = self.__build_cnn(c, output_dim)
self.target.load_state_dict(self.online.state_dict())
# Q_target parameters are frozen.
for p in self.target.parameters():
p.requires_grad = False
defforward(self, input, model):
if model == "online":
return self.online(input)
elif model == "target":
return self.target(input)
def__build_cnn(self, c, output_dim):
return nn.Sequential(
nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(3136, 512),
nn.ReLU(),
nn.Linear(512, output_dim),
)
TD 估计值 & TD 目标值
学习过程中涉及两个值:
TD 估计值 - 对于给定状态 \(s\) 预测的最优 \(Q^*\)
\[{TD}_e = Q_{online}^*(s,a)\]
TD 目标 - 当前奖励与下一状态 \(s'\) 的估计 \(Q^*\) 的聚合
\[a' = argmax_{a} Q_{online}(s', a)\]
\[{TD}_t = r + \gamma Q_{target}^*(s',a')\]
因为我们不知道下一个动作 \(a'\) 会是什么,所以我们使用在下一个状态 \(s'\) 中最大化 \(Q_{online}\) 的动作 \(a'\)。
请注意,我们在 td_target()
上使用了 @torch.no_grad() 装饰器来禁用梯度计算(因为我们不需要对 \(\theta_{target}\) 进行反向传播)。
classMario(Mario):
def__init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.gamma = 0.9
deftd_estimate(self, state, action):
current_Q = self.net(state, model="online")[
np.arange(0, self.batch_size), action
] # Q_online(s,a)
return current_Q
@torch.no_grad()
deftd_target(self, reward, next_state, done):
next_state_Q = self.net(next_state, model="online")
best_action = torch.argmax(next_state_Q, axis=1)
next_Q = self.net(next_state, model="target")[
np.arange(0, self.batch_size), best_action
]
return (reward + (1 - done.float()) * self.gamma * next_Q).float()
更新模型
当 Mario 从重放缓冲区中采样输入时,我们计算 \(TD_t\) 和 \(TD_e\),并将此损失反向传播到 \(Q_{online}\),以更新其参数 \(\theta_{online}\)(其中 \(\alpha\) 是传递给 optimizer
的学习率 lr
)。
\[\theta_{online} \leftarrow \theta_{online} + \alpha \nabla(TD_e - TD_t)\]
\(\theta_{target}\) 不会通过反向传播进行更新。相反,我们会定期将 \(\theta_{online}\) 复制到 \(\theta_{target}\)。
\[\theta_{target} \leftarrow \theta_{online}\]
classMario(Mario):
def__init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
self.loss_fn = torch.nn.SmoothL1Loss()
defupdate_Q_online(self, td_estimate, td_target):
loss = self.loss_fn(td_estimate, td_target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
defsync_Q_target(self):
self.net.target.load_state_dict(self.net.online.state_dict())
保存检查点
classMario(Mario):
defsave(self):
save_path = (
self.save_dir / f"mario_net_{int(self.curr_step//self.save_every)}.chkpt"
)
torch.save(
dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
save_path,
)
print(f"MarioNet saved to {save_path} at step {self.curr_step}")
整合所有内容
classMario(Mario):
def__init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.burnin = 1e4 # min. experiences before training
self.learn_every = 3 # no. of experiences between updates to Q_online
self.sync_every = 1e4 # no. of experiences between Q_target & Q_online sync
deflearn(self):
if self.curr_step % self.sync_every == 0:
self.sync_Q_target()
if self.curr_step % self.save_every == 0:
self.save()
if self.curr_step < self.burnin:
return None, None
if self.curr_step % self.learn_every != 0:
return None, None
# Sample from memory
state, next_state, action, reward, done = self.recall()
# Get TD Estimate
td_est = self.td_estimate(state, action)
# Get TD Target
td_tgt = self.td_target(reward, next_state, done)
# Backpropagate loss through Q_online
loss = self.update_Q_online(td_est, td_tgt)
return (td_est.mean().item(), loss)
日志记录
importnumpyasnp
importtime,datetime
importmatplotlib.pyplotasplt
classMetricLogger:
def__init__(self, save_dir):
self.save_log = save_dir / "log"
with open(self.save_log, "w") as f:
f.write(
f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
f"{'TimeDelta':>15}{'Time':>20}\n"
)
self.ep_rewards_plot = save_dir / "reward_plot.jpg"
self.ep_lengths_plot = save_dir / "length_plot.jpg"
self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
self.ep_avg_qs_plot = save_dir / "q_plot.jpg"
# History metrics
self.ep_rewards = []
self.ep_lengths = []
self.ep_avg_losses = []
self.ep_avg_qs = []
# Moving averages, added for every call to record()
self.moving_avg_ep_rewards = []
self.moving_avg_ep_lengths = []
self.moving_avg_ep_avg_losses = []
self.moving_avg_ep_avg_qs = []
# Current episode metric
self.init_episode()
# Timing
self.record_time = time.time()
deflog_step(self, reward, loss, q):
self.curr_ep_reward += reward
self.curr_ep_length += 1
if loss:
self.curr_ep_loss += loss
self.curr_ep_q += q
self.curr_ep_loss_length += 1
deflog_episode(self):
"Mark end of episode"
self.ep_rewards.append(self.curr_ep_reward)
self.ep_lengths.append(self.curr_ep_length)
if self.curr_ep_loss_length == 0:
ep_avg_loss = 0
ep_avg_q = 0
else:
ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
self.ep_avg_losses.append(ep_avg_loss)
self.ep_avg_qs.append(ep_avg_q)
self.init_episode()
definit_episode(self):
self.curr_ep_reward = 0.0
self.curr_ep_length = 0
self.curr_ep_loss = 0.0
self.curr_ep_q = 0.0
self.curr_ep_loss_length = 0
defrecord(self, episode, epsilon, step):
mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
self.moving_avg_ep_rewards.append(mean_ep_reward)
self.moving_avg_ep_lengths.append(mean_ep_length)
self.moving_avg_ep_avg_losses.append(mean_ep_loss)
self.moving_avg_ep_avg_qs.append(mean_ep_q)
last_record_time = self.record_time
self.record_time = time.time()
time_since_last_record = np.round(self.record_time - last_record_time, 3)
print(
f"Episode {episode} - "
f"Step {step} - "
f"Epsilon {epsilon} - "
f"Mean Reward {mean_ep_reward} - "
f"Mean Length {mean_ep_length} - "
f"Mean Loss {mean_ep_loss} - "
f"Mean Q Value {mean_ep_q} - "
f"Time Delta {time_since_last_record} - "
f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
)
with open(self.save_log, "a") as f:
f.write(
f"{episode:8d}{step:8d}{epsilon:10.3f}"
f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
f"{time_since_last_record:15.3f}"
f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
)
for metric in ["ep_lengths", "ep_avg_losses", "ep_avg_qs", "ep_rewards"]:
plt.clf()
plt.plot(getattr(self, f"moving_avg_{metric}"), label=f"moving_avg_{metric}")
plt.legend()
plt.savefig(getattr(self, f"{metric}_plot"))
开始吧!
在这个示例中,我们运行了 40 轮的训练循环,但为了让马里奥真正掌握他所在世界的规则,我们建议至少运行 40,000 轮训练循环!
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()
save_dir = Path("checkpoints") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)
logger = MetricLogger(save_dir)
episodes = 40
for e in range(episodes):
state = env.reset()
# Play the game!
while True:
# Run agent on the state
action = mario.act(state)
# Agent performs action
next_state, reward, done, trunc, info = env.step(action)
# Remember
mario.cache(state, next_state, action, reward, done)
# Learn
q, loss = mario.learn()
# Logging
logger.log_step(reward, loss, q)
# Update state
state = next_state
# Check if end of game
if done or info["flag_get"]:
break
logger.log_episode()
if (e % 20 == 0) or (e == episodes - 1):
logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)
Using CUDA: True
Episode 0 - Step 163 - Epsilon 0.9999592508251706 - Mean Reward 635.0 - Mean Length 163.0 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 1.897 - Time 2025-03-21T17:44:55
Episode 20 - Step 5007 - Epsilon 0.9987490329557962 - Mean Reward 667.429 - Mean Length 238.429 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 58.232 - Time 2025-03-21T17:45:53
Episode 39 - Step 8854 - Epsilon 0.9977889477081997 - Mean Reward 656.6 - Mean Length 221.35 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 46.245 - Time 2025-03-21T17:46:39
总结
在本教程中,我们了解了如何使用 PyTorch 训练一个游戏 AI。您可以使用相同的方法来训练 AI 玩 OpenAI gym 中的任何游戏。希望您喜欢本教程,如有任何问题,请随时在 我们的 GitHub 上联系我们!