PyTorch 入门指南
学习 PyTorch
图像和视频
音频
后端
强化学习
在生产环境中部署 PyTorch 模型
Profiling PyTorch
代码变换与FX
前端API
扩展 PyTorch
模型优化
并行和分布式训练
边缘端的 ExecuTorch
推荐系统
多模态

使用异步执行实现批量 RPC 处理

作者: Shen Li

前提条件:

本教程演示了如何使用 @rpc.functions.async_execution 装饰器构建批处理 RPC 应用程序,该装饰器通过减少被阻塞的 RPC 线程数量并在被调用方上整合 CUDA 操作来加速训练。这与 使用 TorchServe 进行批量推理 的思路相同。

本教程需要 PyTorch v1.6.0 或更高版本。

基础概念

之前的教程已经展示了使用 torch.distributed.rpc 构建分布式训练应用程序的步骤,但它们并未详细说明在处理 RPC 请求时被调用方会发生什么。从 PyTorch v1.5 开始,每个 RPC 请求都会在被调用方阻塞一个线程以执行该请求中的函数,直到该函数返回。这对于许多用例来说都有效,但存在一个需要注意的问题。如果用户函数在 IO 上阻塞,例如嵌套的 RPC 调用,或者在信号上阻塞,例如等待另一个 RPC 请求解除阻塞,那么被调用方的 RPC 线程将不得不空闲等待,直到 IO 完成或信号事件发生。因此,RPC 被调用方可能会使用比实际需要更多的线程。这个问题的原因是 RPC 将用户函数视为黑盒,几乎不了解函数内部发生了什么。为了允许用户函数释放 RPC 线程,需要向 RPC 系统提供更多提示。

自 v1.6.0 版本起,PyTorch 通过引入两个新概念来解决这个问题:

  • 一个 torch.futures.Future 类型,封装了异步执行,并且支持安装回调函数。

  • 一个 @rpc.functions.async_execution 装饰器,允许应用程序告知被调用者目标函数将返回一个 future,并且可以在执行过程中暂停和多次 yield。

借助这两个工具,应用程序代码可以将用户函数拆分为多个较小的函数,将它们作为回调函数链接到 Future 对象上,并返回包含最终结果的 Future 对象。在被调用方,当获取到 Future 对象时,它也会将后续的 RPC 响应准备和通信作为回调函数安装,这些回调函数将在最终结果准备就绪时触发。通过这种方式,被调用方不再需要阻塞一个线程并等待最终返回值准备就绪。有关简单示例,请参阅 @rpc.functions.async_execution 的 API 文档。

除了减少被调用方的空闲线程数量外,这些工具还有助于使批量 RPC 处理更简单、更快速。本教程的以下两个部分将演示如何使用 @rpc.functions.async_execution 装饰器构建分布式批量更新参数服务器和批量处理强化学习应用程序。

参数服务器的批量更新

考虑一个带有一个参数服务器(PS)和多个训练器的同步参数服务器训练应用程序。在这个应用程序中,PS持有参数并等待所有训练器报告梯度。在每次迭代中,它会等待直到接收到所有训练器的梯度,然后一次性更新所有参数。下面的代码展示了PS类的实现。update_and_fetch_model方法使用@rpc.functions.async_execution装饰,并将被训练器调用。每次调用都会返回一个Future对象,该对象将被填充更新后的模型。大多数训练器发起的调用只是将梯度累加到.grad字段,立即返回,并让出PS上的RPC线程。最后到达的训练器将触发优化器步骤并消耗所有先前报告的梯度。然后,它将用更新后的模型设置future_model,进而通过Future对象通知所有来自其他训练器的先前请求,并向所有训练器发送更新后的模型。

importthreading
importtorchvision
importtorch
importtorch.distributed.rpcasrpc
fromtorchimport optim

num_classes, batch_update_size = 30, 5

classBatchUpdateParameterServer(object):
    def__init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    defget_model(self):
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    defupdate_and_fetch_model(ps_rref, grads):
        # Using the RRef to retrieve the local PS instance
        self = ps_rref.local_value()
        with self.lock:
            self.curr_update_size += 1
            # accumulate gradients into .grad field
            for p, g in zip(self.model.parameters(), grads):
                p.grad += g

            # Save the current future_model and return it to make sure the
            # returned Future object holds the correct model even if another
            # thread modifies future_model before this thread returns.
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                # update the model
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                # by settiing the result on the Future object, all previous
                # requests expecting this updated model will be notified and
                # the their responses will be sent accordingly.
                fut.set_result(self.model)
                self.future_model = torch.futures.Future()

        return fut

对于训练器(trainer),它们都使用来自参数服务器(PS)的同一组参数进行初始化。在每次迭代中,每个训练器首先运行前向传播和反向传播以在本地生成梯度。然后,每个训练器通过RPC将其梯度报告给参数服务器,并通过同一RPC请求的返回值获取更新后的参数。在训练器的实现中,目标函数是否标记为@rpc.functions.async_execution并无区别。训练器只需使用rpc_sync调用update_and_fetch_model,这将阻塞训练器,直到返回更新后的模型。

batch_size, image_w, image_h  = 20, 64, 64

classTrainer(object):
    def__init__(self, ps_rref):
        self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    defget_next_batch(self):
        for _ in range(6):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    deftrain(self):
        name = rpc.get_worker_info().name
        # get initial model parameters
        m = self.ps_rref.rpc_sync().get_model().cuda()
        # start training
        for inputs, labels in self.get_next_batch():
            self.loss_fn(m(inputs), labels).backward()
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()

在本教程中,我们跳过了启动多个进程的代码,完整实现请参考示例仓库。请注意,即使不使用@rpc.functions.async_execution装饰器,也可以实现批处理。然而,这要么需要在参数服务器(PS)上阻塞更多的 RPC 线程,要么需要使用另一轮 RPC 来获取更新后的模型,后者会增加代码复杂性和通信开销。

本节通过一个简单的参数服务器训练示例,展示了如何使用 @rpc.functions.async_execution 装饰器来实现批量 RPC 应用程序。在下一节中,我们将使用批处理重新实现之前的 分布式 RPC 框架入门 教程中的强化学习示例,并展示其对训练速度的影响。

批处理 CartPole 求解器

本节以 OpenAI Gym 中的 CartPole-v1 为例,展示批处理 RPC 对性能的影响。请注意,由于我们的目标是演示 @rpc.functions.async_execution 的使用,而不是构建最佳的 CartPole 求解器或解决大多数不同的强化学习问题,因此我们采用了非常简单的策略和奖励计算策略,并专注于多观察者单代理的批处理 RPC 实现。我们使用了与前一教程类似的 Policy 模型,如下所示。与前一教程相比,不同之处在于其构造函数接受了一个额外的 batch 参数,该参数控制了 F.softmaxdim 参数,因为在批处理的情况下,forward 函数中的 x 参数包含来自多个观察者的状态,因此维度需要适当调整。其他部分保持不变。

importargparse
importtorch.nnasnn
importtorch.nn.functionalasF

parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
                    help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
                    help='number of episodes (default: 10)')
args = parser.parse_args()

torch.manual_seed(args.seed)

classPolicy(nn.Module):
    def__init__(self, batch=True):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
        self.dim = 2 if batch else 1

    defforward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=self.dim)

Observer 的构造函数也相应地进行了调整。它还接受一个 batch 参数,该参数控制它使用哪个 Agent 函数来选择动作。在批量模式下,它会调用 Agent 上的 select_action_batch 函数(稍后将介绍),并且该函数会用 @rpc.functions.async_execution 进行装饰。

importgym
importtorch.distributed.rpcasrpc

classObserver:
    def__init__(self, batch=True):
        self.id = rpc.get_worker_info().id - 1
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.select_action = Agent.select_action_batch if batch else Agent.select_action

与之前的教程 分布式 RPC 框架入门 相比,观察者的行为略有不同。它不是在被环境停止时退出,而是在每个回合中始终运行 n_steps 次迭代。当环境返回时,观察者会简单地重置环境并重新开始。通过这种设计,代理将从每个观察者接收固定数量的状态,因此可以将它们打包成一个固定大小的张量。在每一步中,Observer 使用 RPC 将其状态发送给 Agent,并通过返回值获取动作。在每个回合结束时,它将所有步骤的奖励返回给 Agent。需要注意的是,这个 run_episode 函数将由 Agent 通过 RPC 调用。因此,该函数中的 rpc_sync 调用将是一个嵌套的 RPC 调用。我们也可以将这个函数标记为 @rpc.functions.async_execution,以避免在 Observer 上阻塞一个线程。然而,由于瓶颈在于 Agent 而不是 Observer,因此在 Observer 进程上阻塞一个线程应该是可以接受的。

importtorch

classObserver:
    ...

    defrun_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), NUM_STEPS
        rewards = torch.zeros(n_steps)
        start_step = 0
        for step in range(n_steps):
            state = torch.from_numpy(state).float().unsqueeze(0)
            # send the state to the agent to get an action
            action = rpc.rpc_sync(
                agent_rref.owner(),
                self.select_action,
                args=(agent_rref, self.id, state)
            )

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)
            rewards[step] = reward

            if done or step + 1 >= n_steps:
                curr_rewards = rewards[start_step:(step + 1)]
                R = 0
                for i in range(curr_rewards.numel() -1, -1, -1):
                    R = curr_rewards[i] + args.gamma * R
                    curr_rewards[i] = R
                state = self.env.reset()
                if start_step == 0:
                    ep_reward = min(ep_reward, step - start_step + 1)
                start_step = step + 1

        return [rewards, ep_reward]

Agent 的构造函数还接受一个 batch 参数,用于控制动作概率的批处理方式。在批处理模式下,saved_log_probs 包含一个张量列表,其中每个张量包含来自所有观察者在单个步骤中的动作概率。如果没有启用批处理,saved_log_probs 则是一个字典,其中键是观察者 ID,值是该观察者的动作概率列表。

importthreading
fromtorch.distributed.rpcimport RRef

classAgent:
    def__init__(self, world_size, batch=True):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.policy = Policy(batch).cuda()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.running_reward = 0

        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
            self.rewards[ob_info.id] = []

        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        self.batch = batch
        self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.future_actions = torch.futures.Future()
        self.lock = threading.Lock()
        self.pending_states = len(self.ob_rrefs)

非批处理的 select_acion 简单地将状态传递给策略,保存动作概率,并立即将动作返回给观察者。

fromtorch.distributionsimport Categorical

classAgent:
    ...

    @staticmethod
    defselect_action(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        probs = self.policy(state.cuda())
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

在批处理过程中,状态被存储在二维张量 self.states 中,使用观察者的 ID 作为行 ID。然后,它通过向批量生成的 self.future_actions Future 对象安装回调函数来链接一个 Future,该 Future 对象将使用该观察者的 ID 来填充特定行的索引。最后到达的观察者会一次性将所有批处理状态通过策略运行,并相应地设置 self.future_actions。当这种情况发生时,安装在 self.future_actions 上的所有回调函数都会被触发,它们的返回值将被用来填充链接的 Future 对象,从而通知 Agent 准备并响应来自其他观察者的所有先前 RPC 请求。

classAgent:
    ...

    @staticmethod
    @rpc.functions.async_execution
    defselect_action_batch(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        self.states[ob_id].copy_(state)
        future_action = self.future_actions.then(
            lambda future_actions: future_actions.wait()[ob_id].item()
        )

        with self.lock:
            self.pending_states -= 1
            if self.pending_states == 0:
                self.pending_states = len(self.ob_rrefs)
                probs = self.policy(self.states.cuda())
                m = Categorical(probs)
                actions = m.sample()
                self.saved_log_probs.append(m.log_prob(actions).t()[0])
                future_actions = self.future_actions
                self.future_actions = torch.futures.Future()
                future_actions.set_result(actions.cpu())
        return future_action

现在让我们来定义如何将不同的 RPC 函数连接在一起。Agent 控制着每个 episode 的执行。它首先使用 rpc_async 在所有 observers 上启动 episode,并阻塞在返回的 futures 上,这些 futures 将会被填充上 observers 的奖励。请注意,下面的代码使用了 RRef 辅助函数 ob_rref.rpc_async(),在 ob_rref RRef 的所有者上启动 run_episode 函数,并传入提供的参数。然后,它将保存的动作概率和返回的 observers 奖励转换为预期的数据格式,并启动训练步骤。最后,它重置所有状态并返回当前 episode 的奖励。这个函数是运行一个 episode 的入口点。

classAgent:
    ...

    defrun_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))

        # wait until all obervers have finished this episode
        rets = torch.futures.wait_all(futs)
        rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
        ep_rewards = sum([ret[1] for ret in rets]) / len(rets)

        # stack saved probs into one tensor
        if self.batch:
            probs = torch.stack(self.saved_log_probs)
        else:
            probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
            probs = torch.stack(probs)

        policy_loss = -probs * rewards / len(rets)
        policy_loss.sum().backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

        # reset variables
        self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)

        # calculate running rewards
        self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
        return ep_rewards, self.running_reward

其余代码是正常的进程启动和日志记录,与其他 RPC 教程类似。在本教程中,所有观察者都被动等待来自代理的命令。完整实现请参考 examples 仓库。

defrun_worker(rank, world_size, n_episode, batch, print_log=True):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size, batch)
        for i_episode in range(n_episode):
            last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)

            if print_log:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i_episode, last_reward, running_reward))
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from agents
    rpc.shutdown()


defmain():
    for world_size in range(2, 12):
        delays = []
        for batch in [True, False]:
            tik = time.time()
            mp.spawn(
                run_worker,
                args=(world_size, args.num_episode, batch),
                nprocs=world_size,
                join=True
            )
            tok = time.time()
            delays.append(tok - tik)

        print(f"{world_size}, {delays[0]}, {delays[1]}")


if __name__ == '__main__':
    main()

批处理 RPC 有助于将动作推理整合到更少的 CUDA 操作中,从而减少分摊的开销。上述 main 函数在批处理和非批处理模式下使用不同数量的观察者(从 1 到 10)运行相同的代码。下图使用默认参数值绘制了不同世界大小的执行时间。结果证实了我们的预期,即批处理有助于加速训练。

本页目录