PyTorch 入门指南
学习 PyTorch
图像和视频
音频
后端
强化学习
在生产环境中部署 PyTorch 模型
Profiling PyTorch
代码变换与FX
前端API
扩展 PyTorch
模型优化
并行和分布式训练
边缘端的 ExecuTorch
推荐系统
多模态

结合使用 Distributed DataParallel 与 Distributed RPC 框架

作者: Pritam DamaniaYi Wang

本教程通过一个简单的示例,演示了如何将 DistributedDataParallel (DDP) 与 Distributed RPC 框架 结合使用,以将分布式数据并行与分布式模型并行相结合来训练一个简单的模型。示例的源代码可以在这里找到。

之前的教程,分布式数据并行入门分布式 RPC 框架入门,分别介绍了如何执行分布式数据并行和分布式模型并行训练。然而,在某些训练场景中,您可能希望将这两种技术结合起来。例如:

  1. 如果我们有一个包含稀疏部分(大型嵌入表)和密集部分(全连接层)的模型,我们可能希望将嵌入表放在参数服务器上,并使用 DistributedDataParallel 在多个训练器上复制全连接层。Distributed RPC 框架 可用于在参数服务器上执行嵌入查找。

  2. 按照 PipeDream 论文中描述的方式启用混合并行。我们可以使用 Distributed RPC 框架 在多个工作节点之间流水线化模型的各个阶段,并使用 DistributedDataParallel 复制每个阶段(如果需要)。

在本教程中,我们将涵盖上述提到的第一种情况。在我们的设置中,总共有4个工作节点,如下所示:

  1. 1 个 Master,负责在参数服务器上创建嵌入表(nn.EmbeddingBag)。Master 还负责在两个 Trainer 上驱动训练循环。

  2. 1 个参数服务器,主要用于在内存中保存嵌入表,并响应来自 Master 和 Trainer 的 RPC。

  3. 2 个 Trainer,存储一个全连接层(nn.Linear),该层使用 DistributedDataParallel 在它们之间进行复制。Trainer 还负责执行前向传播、反向传播和优化器步骤。

整个训练过程按如下方式执行:

  1. 主节点创建一个RemoteModule,用于在参数服务器上存储嵌入表。

  2. 然后,主节点在训练器上启动训练循环,并将远程模块传递给训练器。

  3. 训练器创建一个HybridModel,该模型首先使用主节点提供的远程模块执行嵌入查找,然后执行封装在DDP中的全连接层。

  4. 训练器执行模型的前向传播,并使用损失通过Distributed Autograd执行反向传播。

  5. 作为反向传播的一部分,首先计算全连接层的梯度,并通过DDP中的allreduce同步到所有训练器。

  6. 接着,Distributed Autograd将梯度传播到参数服务器,更新嵌入表的梯度。

  7. 最后,使用Distributed Optimizer更新所有参数。

在将 DDP 和 RPC 结合使用时,您应始终为反向传播使用 分布式自动求导

现在,让我们详细讲解每个部分。首先,我们需要在开始任何训练之前设置所有工作节点。我们创建了4个进程,其中rank 0和rank 1是我们的训练器,rank 2是主节点,rank 3是参数服务器。

我们在所有4个工作节点上使用TCP init_method初始化RPC框架。RPC初始化完成后,主节点在参数服务器上使用RemoteModule创建一个包含EmbeddingBag层的远程模块。然后,主节点遍历每个训练器,并使用rpc_async在每个训练器上调用_run_trainer来启动训练循环。最后,主节点等待所有训练完成后再退出。

训练器首先使用 init_process_group 为 DDP 初始化一个 ProcessGroup,设置 world_size=2(对应两个训练器)。接着,他们使用 TCP 的 init_method 初始化 RPC 框架。需要注意的是,RPC 初始化和 ProcessGroup 初始化中使用的端口是不同的,这是为了避免两个框架初始化时的端口冲突。初始化完成后,训练器会等待来自主节点的 _run_trainer RPC 调用。

参数服务器仅初始化 RPC 框架,并等待来自训练器和主节点的 RPC 调用。

defrun_worker(rank, world_size):
r"""
    A wrapper function that initializes RPC, calls the function, and shuts down
    RPC.
    """

    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://localhost:29501"

    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        remote_emb_module = RemoteModule(
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},
        )

        # Run the training loop on trainers.
        futs = []
        for trainer_rank in [0, 1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async(
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)

        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
        )

        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)

在深入讨论 Trainer 的细节之前,我们先来介绍一下 Trainer 所使用的 HybridModel。如下所述,HybridModel 通过一个在参数服务器上存储嵌入表的远程模块(remote_emb_module)以及用于 DDP 的 device 进行初始化。模型的初始化过程将 nn.Linear 层封装在 DDP 中,以便在所有 Trainer 之间复制和同步该层。

模型的前向传播方法非常简单。它使用 RemoteModule 的 forward 方法在参数服务器上执行嵌入查找,并将其输出传递到全连接(FC)层。

classHybridModel(torch.nn.Module):
r"""
    The model consists of a sparse part and a dense part.
    1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
    2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
    This remote model can get a Remote Reference to the embedding table on the parameter server.
    """

    def__init__(self, remote_emb_module, device):
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
        self.device = device

    defforward(self, indices, offsets):
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device))

接下来,我们来看看Trainer的配置。Trainer首先使用一个远程模块创建了上面描述的HybridModel,该远程模块将嵌入表保存在参数服务器上,并使用其自身的rank。

现在,我们需要获取一个RRefs的列表,这些RRefs指向我们希望通过DistributedOptimizer优化的所有参数。为了从参数服务器中获取嵌入表的参数,我们可以调用RemoteModule的remote_parameters,该方法基本上会遍历嵌入表的所有参数并返回一个RRefs列表。Trainer通过RPC在参数服务器上调用此方法,以获取所需参数的RRefs列表。由于DistributedOptimizer总是接受一个需要优化的参数的RRefs列表,因此我们甚至需要为本地FC层的参数创建RRefs。这是通过遍历model.fc.parameters(),为每个参数创建一个RRef并将其附加到remote_parameters()返回的列表中来实现的。注意,我们不能使用model.parameters(),因为它会递归调用model.remote_emb_module.parameters(),而RemoteModule不支持这种方式。

最后,我们使用所有的 RRef 创建 DistributedOptimizer,并定义一个 CrossEntropyLoss 函数。

def_run_trainer(remote_emb_module, rank):
r"""
    Each trainer runs a forward pass which involves an embedding lookup on the
    parameter server and running nn.Linear locally. During the backward pass,
    DDP is responsible for aggregating the gradients for the dense part
    (nn.Linear) and distributed autograd ensures gradients updates are
    propagated to the parameter server.
    """

    # Setup the model.
    model = HybridModel(remote_emb_module, rank)

    # Retrieve all model parameters as rrefs for DistributedOptimizer.

    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()

    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters():
        model_parameter_rrefs.append(RRef(param))

    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs,
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

现在我们已经准备好介绍在每个训练器上运行的主要训练循环。get_next_batch 只是一个辅助函数,用于生成随机的输入和目标以进行训练。我们为多个epoch运行训练循环,并为每个batch执行以下操作:

  1. 为分布式自动求导设置一个分布式自动求导上下文

  2. 运行模型的前向传播并获取其输出。

  3. 使用损失函数根据输出和目标计算损失。

  4. 使用分布式自动求导执行基于损失的分布式反向传播。

  5. 最后,运行分布式优化器步骤以优化所有参数。

    defget_next_batch(rank):
        for _ in range(10):
            num_indices = random.randint(20, 50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)

            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1, 10)
                batch_size += 1

            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target

    # Train for 100 epochs
    for epoch in range(100):
        # create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)

                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])

                # Tun distributed optimizer
                opt.step(context_id)

                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch))

整个示例的源代码可以在这里找到。

本页目录