PyTorch 入门指南
学习 PyTorch
图像和视频
音频
后端
强化学习
在生产环境中部署 PyTorch 模型
Profiling PyTorch
代码变换与FX
前端API
扩展 PyTorch
模型优化
并行和分布式训练
边缘端的 ExecuTorch
推荐系统
多模态

分布式数据并行入门

作者: Shen Li

编辑: Joe Zhu, Chirag Pandya

前提条件:

DistributedDataParallel(DDP)是 PyTorch 中一个强大的模块,它允许您在多台机器上并行化您的模型,使其非常适合大规模深度学习应用。要使用 DDP,您需要生成多个进程并为每个进程创建一个 DDP 实例。

但它是如何工作的呢?DDP 使用 torch.distributed 包中的集体通信来同步所有进程中的梯度和缓冲区。这意味着每个进程都会有自己的一份模型副本,但它们会协同工作,就像在单台机器上训练模型一样。

为了实现这一点,DDP 为模型中的每个参数注册了一个自动求导钩子。当反向传播运行时,该钩子会被触发,并在所有进程之间进行梯度同步。这确保了每个进程都有相同的梯度,然后使用这些梯度来更新模型。

要了解更多关于 DDP 的工作原理以及如何有效地使用它,请务必查看 DDP 设计说明。通过 DDP,您可以比以往更快、更高效地训练您的模型!

推荐的使用 DDP 的方式是为每个模型副本生成一个进程。模型副本可以跨多个设备。DDP 进程可以放置在同一台机器上,也可以跨多台机器。请注意,GPU 设备不能在 DDP 进程之间共享(即一个 GPU 对应一个 DDP 进程)。

在本教程中,我们将从一个基本的 DDP 用例开始,然后演示更高级的用例,包括模型检查点以及将 DDP 与模型并行结合使用。

本教程中的代码运行在 8-GPU 服务器上,但可以轻松推广到其他环境。

DataParallelDistributedDataParallel 的对比

在深入探讨之前,让我们先明确一下为什么尽管DistributedDataParallel增加了复杂性,您仍会考虑使用它而不是DataParallel

  • 首先,DataParallel 是单进程、多线程的,但它只能在单台机器上运行。相比之下,DistributedDataParallel 是多进程的,并且支持单机和多机训练。由于线程间的 GIL 竞争、每次迭代的模型复制,以及分散输入和收集输出引入的额外开销,即使在单台机器上,DataParallel 通常也比 DistributedDataParallel 慢。

  • 回想一下之前的教程,如果您的模型太大而无法适应单个 GPU,您必须使用模型并行将其拆分到多个 GPU 上。DistributedDataParallel 可以与模型并行一起使用,而 DataParallel 目前不支持。当 DDP 与模型并行结合时,每个 DDP 进程将使用模型并行,而所有进程将共同使用数据并行。

基本使用场景

要创建一个 DDP 模块,您首先需要正确设置进程组。更多详细信息可以在使用 PyTorch 编写分布式应用程序中找到。

importos
importsys
importtempfile
importtorch
importtorch.distributedasdist
importtorch.nnasnn
importtorch.optimasoptim
importtorch.multiprocessingasmp

fromtorch.nn.parallelimport DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

defsetup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

defcleanup():
    dist.destroy_process_group()

现在,让我们创建一个简单的模块,用DDP进行封装,并为其提供一些虚拟输入数据。请注意,由于DDP在构造函数中会将模型状态从rank 0进程广播到所有其他进程,因此您无需担心不同的DDP进程会从不同的初始模型参数值开始。

classToyModel(nn.Module):
    def__init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    defforward(self, x):
        return self.net2(self.relu(self.net1(x)))


defdemo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running basic DDP example on rank {rank}.")


defrun_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

如您所见,DDP 封装了底层的分布式通信细节,并提供了一个简洁的 API,使得使用起来就像在操作本地模型一样。梯度同步通信在反向传播过程中进行,并与反向计算重叠。当 backward() 返回时,param.grad 已经包含了同步后的梯度张量。对于基本用例,DDP 只需多几行代码来设置进程组。当将 DDP 应用于更高级的用例时,一些注意事项需要小心处理。

处理速度不均衡

在 DDP(分布式数据并行)中,构造函数、前向传播和反向传播是分布式同步点。不同的进程应启动相同数量的同步,并以相同的顺序到达这些同步点,并且在大致相同的时间进入每个同步点。否则,较快的进程可能会提前到达并在等待较慢的进程时超时。因此,用户需要负责平衡各个进程之间的工作负载分布。有时,由于网络延迟、资源争用或不可预测的工作负载峰值等原因,处理速度的偏差是不可避免的。为了避免这些情况下的超时,请确保在调用 init_process_group 时传递足够大的 timeout 值。

保存和加载检查点

在训练过程中,通常使用 torch.savetorch.load 来保存模块的检查点并从检查点恢复。更多详情请参阅 SAVING AND LOADING MODELS。当使用 DDP 时,一种优化方法是只在其中一个进程中保存模型,然后在所有进程中加载它,从而减少写入开销。这种做法是可行的,因为所有进程都从相同的参数开始,并且在反向传播过程中梯度是同步的,因此优化器应该会将参数设置为相同的值。如果使用这种优化方法(即在一个进程中保存但在所有进程中恢复),请确保在保存完成之前没有进程开始加载。此外,在加载模块时,需要提供适当的 map_location 参数,以防止进程进入其他设备。如果缺少 map_locationtorch.load 会首先将模块加载到 CPU,然后将每个参数复制到其保存的位置,这会导致同一台机器上的所有进程使用同一组设备。如需更高级的故障恢复和弹性支持,请参考 TorchElastic

defdemo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
    print(f"Finished running DDP checkpoint example on rank {rank}.")

结合 DDP 与模型并行

DDP 也支持多 GPU 模型。在训练包含大量数据的大型模型时,使用 DDP 封装多 GPU 模型特别有帮助。

classToyMpModel(nn.Module):
    def__init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    defforward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

当将多 GPU 模型传递给 DDP 时,device_idsoutput_device 不能手动设置。输入和输出数据将由应用程序或模型的 forward() 方法自动放置在适当的设备上。

defdemo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running DDP with model parallel example on rank {rank}.")


if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)
    run_demo(demo_checkpoint, world_size)
    world_size = n_gpus//2
    run_demo(demo_model_parallel, world_size)

使用 torch.distributed.run/torchrun 初始化 DDP

我们可以利用 PyTorch Elastic 来简化 DDP 代码,并更容易地初始化任务。让我们仍然使用 Toymodel 示例,并创建一个名为 elastic_ddp.py 的文件。

importtorch
importtorch.distributedasdist
importtorch.nnasnn
importtorch.optimasoptim

fromtorch.nn.parallelimport DistributedDataParallel as DDP

classToyModel(nn.Module):
    def__init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    defforward(self, x):
        return self.net2(self.relu(self.net1(x)))


defdemo_basic():
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    dist.init_process_group("nccl")
    rank = dist.get_rank()
    print(f"Start running basic DDP example on rank {rank}.")
    # create model and move it to GPU with id rank
    device_id = rank % torch.cuda.device_count()
    model = ToyModel().to(device_id)
    ddp_model = DDP(model, device_ids=[device_id])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_id)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    dist.destroy_process_group()
    print(f"Finished running basic DDP example on rank {rank}.")

if __name__ == "__main__":
    demo_basic()

随后,可以在所有节点上运行 torch elastic/torchrun 命令来初始化上述创建的 DDP 任务:

torchrun--nnodes=2--nproc_per_node=8--rdzv_id=100--rdzv_backend=c10d--rdzv_endpoint=$MASTER_ADDR:29400elastic_ddp.py

在上面的示例中,我们在两个主机上运行 DDP 脚本,并且每个主机上运行 8 个进程。也就是说,我们正在 16 个 GPU 上运行此任务。请注意,$MASTER_ADDR 必须在所有节点上保持一致。

在这里,torchrun 将启动 8 个进程,并在其启动的节点上的每个进程中调用 elastic_ddp.py,但用户还需要应用像 slurm 这样的集群管理工具来实际在两个节点上运行此命令。

例如,在启用 SLURM 的集群上,我们可以编写一个脚本来运行上述命令,并将 MASTER_ADDR 设置为:

exportMASTER_ADDR=$(scontrolshowhostname${SLURM_NODELIST}|head-n1)

然后我们可以使用 SLURM 命令运行这个脚本:srun --nodes=2 ./torchrun_script.sh

这只是一个示例;您可以选择自己的集群调度工具来启动 torchrun 任务。

有关弹性运行的更多信息,请参阅快速入门文档

本页目录