分布式数据并行
- 类 torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=None, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False, delay_all_reduce_named_params=None, param_to_hook_all_reduce=None, mixed_precision=None, device_mesh=None)[源代码]
-
基于
torch.distributed
在模块级别实现分布式数据并行。此容器通过在每个模型副本之间同步梯度来提供数据并行性。要同步的设备由输入的
process_group
指定,默认情况下为整个系统。需要注意的是,DistributedDataParallel
不会将输入分块或拆分为参与的 GPU;用户需要自行定义如何进行此操作,例如通过使用DistributedSampler
。另请参阅:基础知识 和 使用 nn.parallel.DistributedDataParallel 而不是 multiprocessing 或 nn.DataParallel。输入的约束条件与
torch.nn.DataParallel
中的一样。创建此类需要先通过调用
torch.distributed.init_process_group()
初始化torch.distributed
。DistributedDataParallel
在单节点多GPU数据并行训练中的速度被证明比torch.nn.DataParallel
更快。要在具有 N 张 GPU 的主机上使用
DistributedDataParallel
,你需要启动 N 个进程,并确保每个进程独占一张从 0 到 N-1 编号的 GPU。这可以通过为每个进程设置CUDA_VISIBLE_DEVICES
环境变量或调用相应函数来实现。>>> torch.cuda.set_device(i)
其中 i 的取值范围是从 0 到 N-1。在每个进程中,应参照以下内容来构建该模块:
>>> torch.distributed.init_process_group( >>> backend='nccl', world_size=N, init_method='...' >>> ) >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
要在一个节点上启动多个进程,可以使用
torch.distributed.launch
或torch.multiprocessing.spawn
。注意
请参阅 PyTorch 分布式概述,了解与分布式训练相关的所有功能的简要介绍。
注意
DistributedDataParallel
可以与torch.distributed.optim.ZeroRedundancyOptimizer
结合使用,以减少每节点的优化器状态内存占用。更多详情请参阅 ZeroRedundancyOptimizer 实用技巧。注意
nccl
后端是当前使用 GPU 时最快的后端,强烈推荐。它既适用于单节点也适用于多节点的分布式训练。注意
此模块还支持混合精度分布式训练。这意味着你的模型可以包含不同类型的参数,例如同时使用
fp16
和fp32
类型,在这些混合类型的参数上进行梯度减少操作也能正常运行。注意
如果你在一个进程中使用
torch.save
来保存模块,并在其他进程中使用torch.load
来恢复该模块,确保每个进程中的map_location
配置正确。如果没有设置map_location
,torch.load
会将模块恢复到它被保存时所在的设备。注意
当一个模型在
M
个节点上以batch=N
进行训练时,与在同一节点上使用batch=M*N
进行训练的模型相比,梯度会小M
倍(前提是损失是相加而非通常情况下的平均)。如果损失是在批次中的实例间求和的话,不同节点之间的梯度会被平均。因此,在你希望获得与本地训练过程数学等效的情况下,你应该考虑这一点。但在大多数情况下,你可以将一个由DistributedDataParallel包装的模型、一个由DataParallel包装的模型以及单个GPU上的普通模型视为相同(例如,使用相同的学习率来处理相等的批次大小)。注意
参数永远不会在进程之间广播。该模块对梯度执行全reduce操作,并假设所有进程将以相同的方式通过优化器修改这些梯度。缓冲区(例如批量归一化统计信息)从rank为0的进程中广播到系统中的所有其他副本,每次迭代都会进行广播。
注意
如果你在使用分布式数据并行(分布式 RPC 框架),你应该始终使用
torch.distributed.autograd.backward()
来计算梯度,并使用torch.distributed.optim.DistributedOptimizer
优化参数。示例:
>>> import torch.distributed.autograd as dist_autograd >>> from torch.nn.parallel import DistributedDataParallel as DDP >>> import torch >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> rref = rpc.remote("worker1", torch.add, args=(t1, t2)) >>> ddp_model = DDP(my_model) >>> >>> # Setup optimizer >>> optimizer_params = [rref] >>> for param in ddp_model.parameters(): >>> optimizer_params.append(RRef(param)) >>> >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> optimizer_params, >>> lr=0.05, >>> ) >>> >>> with dist_autograd.context() as context_id: >>> pred = ddp_model(rref.to_here()) >>> loss = loss_func(pred, target) >>> dist_autograd.backward(context_id, [loss]) >>> dist_optim.step(context_id)
注意
DistributedDataParallel 当前对使用
torch.utils.checkpoint()
进行梯度检查点的支持有限。如果使用 use_reentrant=False(推荐)进行检查点操作,DDP 将按预期工作而没有任何限制。然而,如果使用 use_reentrant=True(默认值)进行检查点操作,并且模型中没有未使用的参数且每个层最多只被检查一次时(确保你没有将 find_unused_parameters=True 传递给 DDP),DDP 将正常工作。我们目前不支持一个层多次被检查点化或在检查点化的模型中有未使用参数的情况。注意
为了使一个非DDP模型能够加载来自DDP模型的状态字典,需要先使用
consume_prefix_in_state_dict_if_present()
函数移除DDP状态字典中的“module.”前缀,然后再进行加载。警告
构造函数、前向方法以及该模块输出(或其输出的函数)的微分是分布式同步点。请注意,不同进程可能会执行不同的代码。
警告
此模块假设所有参数在创建时已注册到模型中。之后不得添加或删除任何参数。缓冲区也遵循同样的规则。
警告
此模块假设所有参数在每个分布式进程的模型中都已注册,并且这些参数的顺序一致。该模块将按照模型中注册参数的反向顺序执行梯度
allreduce
操作。换句话说,用户需要确保每个分布式进程具有完全相同的模型和参数注册顺序。警告
此模块支持非行优先连续步长的参数。例如,你的模型中的一些参数可能使用
torch.memory_format
的torch.contiguous_format
格式,而其他参数则使用torch.channels_last
格式。然而,在不同进程中的相应参数必须具有相同的步长。警告
此模块不与
torch.autograd.grad()
配合使用(也就是说,它仅在梯度累加到参数的.grad
属性时才有效)。警告
如果你计划使用此模块与
nccl
后端或使用 Infiniband 的gloo
后端(并结合使用多个工作进程的数据加载器),请将多进程启动方法更改为forkserver
(仅限 Python 3)或spawn
。不幸的是,Gloo(使用 Infiniband 的版本)和 NCCL2 不支持 fork 操作,并且如果你不更改此设置,可能会遇到死锁问题。警告
你不应该在使用
DistributedDataParallel
封装你的模型后尝试修改模型的参数。因为在封装过程中,DistributedDataParallel
的构造函数会在构建时为其所有参数注册额外的梯度减少函数。如果之后更改了这些参数,那么梯度减少函数将不再与正确的参数集匹配。警告
结合使用
DistributedDataParallel
和分布式RPC框架是实验性的,可能随时发生变化。- 参数
-
-
module (Module) – 需要进行并行处理的模块
-
device_ids (列表 of 整数 or torch.device) –
CUDA 设备:1)对于单个设备的模块,
device_ids
应包含一个且仅一个设备 ID,表示该进程对应的输入模块所在的唯一 CUDA 设备。此外,device_ids
也可以是None
。2)对于多设备模块和 CPU 模块,device_ids
必须为None
。当两个情况下的
device_ids
均为None
时,前向传递的输入数据和实际模块都必须放在正确的设备上。(默认值:None
) -
output_device (int 或 torch.device) – 单个 CUDA 设备模块的输出设备位置。对于多设备模块和 CPU 模块,该值必须为
None
,并且由模块本身决定输出位置。(默认值:单个设备模块的device_ids[0]
) -
broadcast_buffers (bool) – 一个标志,用于在
forward
函数开始时同步(广播)模块的缓冲区。默认值为True
。 -
process_group - 用于分布式数据全约简的过程组。如果设置为
None
,则使用由torch.distributed.init_process_group()
创建的默认过程组。(默认值:None
) -
bucket_cap_mb –
DistributedDataParallel
会将参数分成多个桶,使得每个桶的梯度减少可以与反向计算重叠。bucket_cap_mb
控制桶的大小(以梅比字节MiB为单位)。如果设置为None
,则默认使用25 MiB。(默认值:None
) -
find_unused_parameters (bool) – 遍历包装模块的
forward
函数返回值中包含的所有张量的自动求导图。对于那些在该图中没有梯度的部分参数,会提前标记为可以被减少的状态。此外,那些可能在包装模块的forward
函数中使用但未参与损失计算因而也不会有梯度的参数也会被提前标记为可以被减少的状态。(默认值:False
) -
check_reduction - 此参数已 deprecated。
-
gradient_as_bucket_view (bool) – 当设置为
True
时,梯度将是指向不同偏移量的视图,这些偏移量位于allreduce
通信桶中。这可以减少峰值内存使用量,并节省等于总梯度大小的内存量。此外,它避免了在梯度和allreduce
通信桶之间复制数据的开销。当梯度为视图时,不能对梯度调用detach_()
方法。如果遇到此类错误,请参考torch/optim/optimizer.py
中的zero_grad()
函数作为解决方案。注意,梯度在第一次迭代后将为视图,因此峰值内存节省应在第一次迭代之后进行检查。 -
static_graph (bool) - 是否使用静态图
当设置为
True
时,DDP 知道训练的图是静态的。这意味着:
1) 在整个训练过程中,使用的和未使用的参数集合不会改变;因此,是否设置find_unused_parameters = True
并不重要。
2) 图在整个训练过程中的训练方式不会改变(即没有依赖于迭代次数的控制流)。
当静态图被设置为True
时,DDP 可以支持以下过去无法实现的情况:
1) 重新进入反向传播。
2) 多次激活检查点。
3) 模型有未使用参数时进行激活检查点。
4) 前向函数之外存在模型参数。
5) 当静态图被设置为True
时,当存在未使用的参数时可能提高性能,因为 DDP 不会在每次迭代中搜索图来检测未使用的参数。要检查是否可以将静态图设置为True
,一种方法是在您之前的模型训练结束时检查 ddp 日志数据;如果ddp_logging_data.get("can_set_static_graph") == True
,通常您可以将static_graph = True
。- 示例:
-
>>> model_DDP = torch.nn.parallel.DistributedDataParallel(model) >>> # Training loop >>> ... >>> ddp_logging_data = model_DDP._get_ddp_logging_data() >>> static_graph = ddp_logging_data.get("can_set_static_graph")
-
delay_all_reduce_named_params (列表 of 元组 of str 和 torch.nn.Parameter) – 一个包含命名参数的列表,这些参数的 all reduce 操作将在
param_to_hook_all_reduce
参数的梯度准备好时被延迟。DDP 的其他参数不适用于此参数中指定的命名参数,因为 DDP 减量器会忽略这些命名参数。 -
param_to_hook_all_reduce (torch.nn.Parameter) – 用于挂载在
delay_all_reduce_named_params
中指定的参数上的延迟 All Reduce 操作的参数。
-
- 变量
-
module (Module) – 需要进行并行处理的模块。
示例:
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> net = torch.nn.parallel.DistributedDataParallel(model)
- join(divide_by_initial_world_size=True, enable=True, throw_on_early_termination=False)[源代码]
-
用于在DDP中处理不同进程之间输入不均匀的上下文管理器。
这个上下文管理器会跟踪已加入的DDP进程,并在前向和后向传递中插入集体通信操作来“影子”匹配未加入的DDP进程创建的操作。这确保每个集体调用都有一个对应的已加入DDP进程的调用,从而防止由于跨进程输入不均匀导致的挂起或错误。或者,如果标志
throw_on_early_termination
被设置为True
,一旦某个进程耗尽了输入,所有训练器都将抛出一个错误,允许根据应用程序逻辑捕获和处理这些错误。一旦所有DDP进程都已加入,上下文管理器会将最后一个加入进程的模型广播给所有进程,确保每个进程中的模型都是相同的(这一点由DDP保证)。
要使用此方法启用不同进程间输入不均匀的训练,只需将此上下文管理器应用于你的训练循环。无需对模型或数据加载做任何进一步的修改。
警告
如果此上下文管理器包装的模型或训练循环包含额外的分布式集体操作(例如,在前向传递中的
SyncBatchNorm
),则必须启用标志throw_on_early_termination
。这是因为该上下文管理器不支持非DDP的集体通信。当任何一端耗尽输入时,此标志会使得所有端抛出异常,从而允许在所有端上捕获并恢复这些错误。- 参数
-
-
divide_by_initial_world_size (bool) – 如果设置为
True
,则在初始的world_size
启动 DDP 训练时将梯度除以该值。如果设置为False
,则通过计算剩余的有效节点数(尚未耗尽输入的节点数量)来划分梯度,并在 allreduce 期间进行调整。设置divide_by_initial_world_size=True
可确保每个输入样本(包括不均匀的输入)对全局梯度的贡献权重相等,这是通过始终将梯度除以初始的world_size
来实现的,即使遇到不均匀的输入也是如此。如果将其设置为False
,则会将梯度除以剩余节点的数量,这确保了与较小的world_size
训练的一致性,尽管这也意味着不均匀的输入会对全局梯度贡献更多。通常,在训练任务中最后几个输入是不均匀的情况下,你希望将其设置为True
。在极端情况下,如果输入数量差异很大,则将此值设为False
可能会提供更好的结果。 -
enable (bool) – 是否启用不均匀输入检测。如果确定参与进程之间的输入是均匀的,可以通过设置
enable=False
来禁用此功能。默认值为True
。 -
throw_on_early_termination (bool) – 是否在至少一个进程耗尽输入时抛出错误或继续训练。如果设置为
True
,则当第一个进程到达数据末尾时会抛出异常并停止训练。如果设置为False
,则会在所有进程加入之前以较小的有效世界大小继续训练。需要注意的是,如果设置了此标志,则忽略标志divide_by_initial_world_size
。默认值是False
。
-
示例:
>>> import torch >>> import torch.distributed as dist >>> import os >>> import torch.multiprocessing as mp >>> import torch.nn as nn >>> # On each spawned worker >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> torch.cuda.set_device(rank) >>> model = nn.Linear(1, 1, bias=False).to(rank) >>> model = torch.nn.parallel.DistributedDataParallel( >>> model, device_ids=[rank], output_device=rank >>> ) >>> # Rank 1 gets one more input than rank 0. >>> inputs = [torch.tensor([1]).float() for _ in range(10 + rank)] >>> with model.join(): >>> for _ in range(5): >>> for inp in inputs: >>> loss = model(inp).sum() >>> loss.backward() >>> # Without the join() API, the below synchronization will hang >>> # blocking for rank 1's allreduce to complete. >>> torch.cuda.synchronize(device=rank)
- join_hook(**kwargs)[源代码]
-
DDP join hook 通过在前向和后向传递中镜像通信,使不均匀输入的训练变得可行。
- 参数
-
kwargs (dict) – 一个包含关键字参数的字典,用于在运行时修改连接钩子的行为;所有共享相同连接上下文管理器的
Joinable
实例都会收到相同的kwargs
值。
- 挂钩支持以下关键词参数:
-
- divide_by_initial_world_size (bool, 可选):
-
如果设置为
True
,梯度将被初始的 DDP 启动时的世界大小除;若设置为False
,则梯度会被有效世界大小(即未加入的过程数量)除。这意味着不均匀输入对全局梯度的影响会更大。通常情况下,如果不均匀程度较小,则应将此值设为True
;在极端情况下可以将其设为False
以可能获得更好的结果。默认设置是True
。
- no_sync()[源代码]
-
用于禁止DDP进程中梯度同步的上下文管理器。
在此上下文中,梯度将累积在模块变量上,并在第一次离开该上下文的正向和反向传递过程中进行同步。
示例:
>>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg) >>> with ddp.no_sync(): >>> for input in inputs: >>> ddp(input).backward() # no synchronization, accumulate grads >>> ddp(another_input).backward() # synchronize grads
警告
前向传递应放在上下文管理器内,否则梯度仍会被同步。
- register_comm_hook(state, hook)[源代码]
-
为用户定义的 DDP 跨多个工作进程聚合梯度,注册通信钩子。
这个钩子对研究人员尝试新想法非常有帮助。例如,可以使用它来实现一些算法,比如 GossipGrad 和梯度压缩等,在进行分布式数据并行训练时采用不同的通信策略来进行参数同步。
- 参数
-
-
state (对象) –
在训练过程中,传递给钩子以维护状态信息。例如,在梯度压缩中提供错误反馈,在 GossipGrad 中确定下一个通信的对等节点。
它由每个工作者本地存储,并被该工作者上的所有梯度张量共享。
-
hook (Callable) –
具有以下签名的可调用对象:
hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]
:当桶准备就绪时,会调用此函数。该钩子可以执行任何必要的处理,并返回一个表示异步工作(如 allreduce)已完成的 Future 对象。即使钩子不进行任何通信,也必须返回一个已完成的 Future 对象。Future 对象应包含 grad 桶张量的新值。一旦桶准备好,c10d reducer 会调用此钩子,并使用 Future 返回的张量将梯度复制到各个参数中。需要注意的是,future 的返回类型必须是一个单一的张量。
我们还提供了一个名为
get_future
的API,用于检索与c10d.ProcessGroup.Work
完成相关的Future对象。当前,get_future
支持NCCL,并且在GLOO和MPI上的大多数操作中也得到了支持,但不包括进程间通信操作(如send/recv)。
-
警告
Grad bucket的张量不会预先除以world_size。在进行类似allreduce的操作时,用户需自行完成除以world_size的步骤。
警告
DDP通信钩子只能注册一次,且应在调用backward之前完成注册。
警告
hook 返回的 Future 对象应该包含一个与梯度桶内张量形状相同的单一张量。
警告
get_future
API 支持 NCCL,并部分支持 GLOO 和 MPI 后端(不支持如 send/recv 这样的点对点操作),最终将返回一个torch.futures.Future
对象。- 示例:
-
以下是一个示例 noop 钩子,返回相同的张量。
>>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]: >>> fut = torch.futures.Future() >>> fut.set_result(bucket.buffer()) >>> return fut >>> ddp.register_comm_hook(state=None, hook=noop)
- 示例:
-
下面是一个并行SGD算法的示例:在执行allreduce之前对梯度进行编码,在执行allreduce之后对其进行解码。
>>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]: >>> encoded_tensor = encode(bucket.buffer()) # encode gradients >>> fut = torch.distributed.all_reduce(encoded_tensor).get_future() >>> # Define the then callback to decode. >>> def decode(fut): >>> decoded_tensor = decode(fut.value()[0]) # decode gradients >>> return decoded_tensor >>> return fut.then(decode) >>> ddp.register_comm_hook(state=None, hook=encode_and_decode)