分布式通信包 - torch.distributed

注意

请参阅 PyTorch 分布式概述,了解与分布式训练相关的所有功能的简要介绍。

后端

torch.distributed 支持三种内置后端,每个后端有不同的功能。下表展示了这些功能在 CPU 和 CUDA 张量中的可用性。需要注意的是,MPI 只有在用于构建 PyTorch 的实现支持 CUDA 时才支持 CUDA。

后端

gloo

mpi

nccl

设备

中央处理器

GPU

中央处理器

GPU

中央处理器

GPU

发送

?

接收数据(recv)

?

广播

?

all_reduce

?

减少

?

all_gather

?

收集

?

散开

?

reduce_scatter

全员参与

?

屏障

?

PyTorch自带的后端

PyTorch 分布式包支持 Linux(稳定版)、MacOS(稳定版)和 Windows(原型)。默认情况下,对于 Linux 系统,Gloo 和 NCCL 后端会被构建并包含在 PyTorch 分布式中(NCCL 仅在使用 CUDA 构建时被包含)。MPI 是一个可选后端,只有在从源代码构建 PyTorch 时才能添加。(例如,在已经安装了 MPI 的主机上构建 PyTorch。)

注意

从 PyTorch v1.8 开始,Windows 支持所有集体通信后端(除了 NCCL)。如果 init_method 参数指向一个文件,则该文件必须遵循以下格式:

  • 本地文件系统,例如 init_method="file:///d:/tmp/some_file"

  • 共享文件系统,初始化方法为:init_method="file://////{machine_name}/{share_folder_name}/some_file"

与在 Linux 平台一样,你可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。

选择哪个后端?

在过去,我们常常会被问到:“我应该使用哪个后端?”

  • 经验法则

    • 使用 NCCL 后端进行分布式 GPU 训练

    • 使用 Gloo 后端进行分布式 CPU 训练。

  • 配备InfiniBand互连的GPU主机

    • 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。

  • 配备以太网互连的 GPU 主机

    • 使用 NCCL,因为它目前提供了最佳的分布式 GPU 训练性能,特别是对于多进程单节点或多节点分布式训练。如果遇到任何与 NCCL 相关的问题,请将 Gloo 作为备用选项。需要注意的是,Gloo 在 GPU 上的运行速度比 NCCL 慢。

  • 配备InfiniBand互连的CPU主机

    • 如果您的InfiniBand启用了IP over IB功能,请使用Gloo;否则请使用MPI。我们计划在未来发布的版本中为Gloo增加对InfiniBand的支持。

  • 通过以太网互连的CPU主机

    • 除非有特定原因使用MPI,否则建议使用Gloo。

常见环境变量

选择网络接口

默认情况下,NCCL 和 Gloo 后端会自动查找合适的网络接口。如果自动检测到的接口不正确,你可以通过设置相应的环境变量来覆盖它。

  • NCCL_SOCKET_IFNAME,例如:export NCCL_SOCKET_IFNAME=eth0

  • GLOO_SOCKET_IFNAME,例如:export GLOO_SOCKET_IFNAME=eth0

如果你使用的是Gloo后端,可以通过逗号分隔来指定多个接口,例如:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。后端将在这些接口上以轮询方式分配操作。所有进程都必须在这个变量中指定相同数量的接口。

其他 NCCL 环境变量

调试 - 如果出现 NCCL 失败的情况,可以将 NCCL_DEBUG=INFO 设置为打印出具体的警告信息和基本的 NCCL 初始化详情。

你还可以使用 NCCL_DEBUG_SUBSYS 来获取 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL 会打印聚合调用的日志,在调试挂起问题时可能会有所帮助,特别是那些由集合类型或消息大小不匹配引起的挂起问题。如果拓扑检测失败,可以设置 NCCL_DEBUG_SUBSYS=GRAPH 来检查详细的检测结果,并保存以备需要从 NCCL 团队获得进一步帮助时参考。

性能调优 - NCCL 基于拓扑检测进行自动优化,以减少用户的调优工作。在某些基于插座的系统中,用户仍可以尝试调整 NCCL_SOCKET_NTHREADSNCCL_NSOCKS_PERTHREAD 来增加网络带宽。这两个环境变量已经在一些云服务提供商(如 AWS 或 GCP)上进行了预调优。

关于 NCCL 环境变量的完整列表,请参考NVIDIA NCCL 官方文档

基础知识

torch.distributed 包提供了跨多台机器上的多个计算节点的多进程并行支持和通信原语。类 torch.nn.parallel.DistributedDataParallel() 基于此功能,为任何 PyTorch 模型提供同步分布式训练的包装器。这与 多进程包 - torch.multiprocessingtorch.nn.DataParallel() 提供的功能不同,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的不同副本。

在单机同步情况下,torch.distributed 或者 torch.nn.parallel.DistributedDataParallel() 包装器可能仍然比其他数据并行方法(如torch.nn.DataParallel())具有优势:

  • 每个进程维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然梯度已经在各进程中聚集并平均化,使得每个进程的梯度相同,但这样可以省去参数广播步骤,从而减少在节点之间传输张量所需的时间。

  • 每个进程都包含一个独立的Python解释器,消除了从单个Python进程驱动多个执行线程、模型副本或GPU所带来的额外开销和“GIL争用”。这对于大量使用Python运行时的模型尤其重要,例如具有递归层或多小组件的模型。

初始化

在调用任何其他方法之前,需要使用torch.distributed.init_process_group()torch.distributed.device_mesh.init_device_mesh() 函数来初始化该包。这两个函数会阻塞,直到所有进程都加入。

torch.distributed.is_available()[源代码]

如果分布式包可用,返回 True

否则,torch.distributed 不会暴露其他任何 API。目前,torch.distributed 支持 Linux、MacOS 和 Windows 系统。从源代码构建 PyTorch 时,设置 USE_DISTRIBUTED=1 来启用它。当前,默认情况下,Linux 和 Windows 的值为 USE_DISTRIBUTED=1,而 MacOS 的默认值为 USE_DISTRIBUTED=0

返回类型

bool

torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[源代码]

初始化默认的分布式进程组。

这还将初始化分布式包。

初始化进程组主要有以下两种方法:
  1. 请明确指定 storerankworld_size

  2. 指定 init_method(一个 URL 字符串),用于指示如何发现对等节点。可选地设置 rankworld_size 参数,或者将所有必需的参数编码在 URL 中并省略这些参数。

如果没有指定,默认的 init_method 为 “env://”。

参数
  • backend (strBackend, 可选) – 使用的后端。根据构建时配置的不同,有效值包括 mpiglooncclucc。如果没有指定后端,则会同时创建一个 gloo 后端和一个 nccl 后端,具体管理多个后端的方法请参见下文说明。此字段可以作为小写字符串(例如 "gloo")给出,也可以通过 Backend 属性访问(例如 Backend.GLOO)。如果使用 nccl 后端的多进程,则每个进程必须对其使用的 GPU 有独占访问权限,否则跨进程共享 GPU 可能会导致死锁。ucc 后端是实验性的。

  • init_method (str, optional) – 指定如何初始化进程组的 URL。默认值为“env://”,如果未指定 init_methodstore。与 store 互斥。

  • world_size (int, optional) – 参与任务的进程数量。如果指定了 store,则该参数是必需的。

  • rank (int, 可选) – 当前进程的排名(应为0到world_size-1之间的数字)。如果指定了store,则此参数是必需的。

  • store (Store, 可选) – 一个所有工作者都可以访问的键值存储,用于交换连接和地址信息。与init_method互斥。

  • timeout (timedelta, 可选) – 针对进程组执行的操作的超时时间。默认值为 NCCL 的 10 分钟和其他后端的 30 分钟。这是在集体操作被异步中止并导致进程崩溃之前的持续时间。由于 CUDA 执行是异步的,一旦出现失败的异步 NCCL 操作,继续执行用户代码将不再安全(因为后续的 CUDA 操作可能会在损坏的数据上运行)。当设置了 TORCH_NCCL_BLOCKING_WAIT 时,进程将会阻塞并等待超时。

  • group_name (str, 可选, 已弃用) – 组名。此参数会被忽略。

  • pg_options (ProcessGroupOptions, 可选) – 指定在构建特定进程组时需要传递的额外选项。目前,我们仅支持nccl后端的ProcessGroupNCCL.Options,可以设置is_high_priority_stream参数,以便在有计算内核等待时,nccl后端可以选择高优先级的 CUDA 流。有关其他可用选项以配置 NCCL,请参见https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t

  • device_id (torch.device, 可选) – 绑定到特定设备,允许进行后端特定的优化。目前在 NCCL 中有两个效果:通信器会立即初始化(直接调用 ncclCommInit* 而不是通常的惰性调用),并且子组将尽可能使用 ncclCommSplit 来避免不必要的创建开销。如果你想提前发现 NCCL 初始化错误,也可以使用此字段。

注意

要启用backend == Backend.MPI,需要在支持 MPI 的系统上从源代码编译 PyTorch。

注意

多后端支持目前处于实验阶段。如果没有明确指定后端,默认会创建 gloonccl 两个后端。对于使用 CPU 张量的集合操作,将采用 gloo 后端;而对于使用 CUDA 张量的操作,则会使用 nccl 后端。可以通过传递格式为“<设备类型>:<后端名称>,<设备类型>:<后端名称>”的字符串来指定自定义后端,例如 “cpu:gloo,cuda:custom_backend”。

torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[源代码]

基于device_typemesh_shapemesh_dim_names参数初始化一个DeviceMesh

这会创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,每个维度将被标记为 mesh_dim_names[i]

注意

init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序会在集群中的所有进程/排名上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有排名中保持一致。如果不一致,mesh_shape 可能会导致程序挂起。

注意

如果未找到进程组,init_device_mesh 将在后台自动初始化用于分布式通信的分布式进程组。

参数
  • device_type (str) – 网格的设备类型。当前支持:“cpu”和“cuda/cuda-like”。不允许传入带有GPU索引的设备类型,例如“cuda:0”。

  • mesh_shape (Tuple[int]) – 描述设备布局的多维数组维度的元组。

  • mesh_dim_names (Tuple[str], optional) – 一个包含网格维度名称的元组,用于分配给描述设备布局的多维数组的每个维度。其长度必须与mesh_shape相同,并且mesh_dim_names中的每个字符串都必须是唯一的。

返回值

表示设备布局的 DeviceMesh 对象。

返回类型

DeviceMesh

示例:
>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
torch.distributed.is_initialized()[源代码]

检查默认进程组是否已经初始化。

返回类型

bool

torch.distributed.is_mpi_available()[源代码]

检查 MPI 后端是否可用。

返回类型

bool

torch.distributed.is_nccl_available()[源代码]

检查是否可以使用NCCL后端。

返回类型

bool

torch.distributed.is_gloo_available()[源代码]

检查 Gloo 后端是否可用。

返回类型

bool

torch.distributed.is_torchelastic_launched()[源代码]

检查该进程是否通过 torch.distributed.elastic(即 torchelastic)启动。

环境变量 TORCHELASTIC_RUN_ID 的存在被用作一个标志,以判断当前进程是否由 torchelastic 启动。由于 TORCHELASTIC_RUN_ID 映射到会话 ID,并且该 ID 总是非空值,表示作业 ID 用于对等发现,因此这是一个合理的标志。

返回类型

bool


目前支持三种初始化方法:

TCP 初始化

使用TCP进行初始化有兩種方式,都需要一個所有進程都能訪問的網絡地址和期望的 world_size。第一种方式需要指定rank 0进程的地址。这种初始化方法要求所有进程都必须手动指定rank。

请注意,最新版本的分布式包不再支持多播地址,并且 group_name 已被弃用。

import torch.distributed as dist

# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)

共享文件系统初始化

另一种初始化方法使用一个在组内所有机器上共享且可见的文件系统,并指定所需的world_size。URL应以file://开头,指向现有目录中不存在的一个新文件。如果文件不存在,文件系统会自动创建它,但不会删除该文件。因此,在下次调用init_process_group()之前,你需要确保清理这个文件。

请注意,最新版本的分布式包不支持自动排名分配,并且group_name也已被弃用。

警告

该方法假设文件系统支持使用 fcntl 进行锁操作,大多数本地系统和 NFS 都支持这一点。

警告

此方法会始终创建一个文件,并尽力在程序结束时清理并删除该文件。换句话说,每次使用文件初始化方法进行初始化时,都需要一个新的空文件才能成功完成初始化。如果再次使用上一次初始化所用的同一个未被清理的文件,则会导致意外行为和可能的死锁或失败。因此,尽管此方法会尽力清理文件,但如果自动删除不成功,你需要确保在训练结束时移除该文件以防止其在下次再被重复使用。这一点尤其重要,如果你计划多次调用init_process_group() 并且使用相同的文件名。换句话说,如果文件没有被删除/清理,并且你再次对该文件调用init_process_group() ,则可能会导致失败。这里的经验法则是,在每次调用init_process_group() 时,确保文件不存在或为空。

import torch.distributed as dist

# rank should always be specified
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
                        world_size=4, rank=args.rank)

环境变量初始化

此方法会从环境变量中读取配置,从而允许您完全自定义信息的获取方式。需要设置的变量有:

  • MASTER_PORT - 必须是一个在 rank 0 的机器上未被使用的空闲端口

  • MASTER_ADDR - 必填(rank 0 除外);rank 0 节点的地址

  • WORLD_SIZE - 必需;可以在下面两种方式中任选其一进行设置:在这里设置或是在调用初始化函数时设置。

  • RANK - 必需;可以在当前位置设置,或者在调用初始化函数时进行设置。

排名为0的机器将用于建立所有的连接。

这是默认设置,意味着无需指定init_method(或者可以将其设为env://)。

初始化完成后

一旦运行了torch.distributed.init_process_group(),就可以使用以下函数。要检查进程组是否已经初始化,请使用torch.distributed.is_initialized()

torch.distributed.Backend(name)[源代码]

类似于枚举的后端类。

可用的后端包括:GLOO、NCCL、UCC、MPI 以及其他已注册的后端。

此类的值是小写的字符串,例如 "gloo"。这些值可以像属性一样进行访问,例如 Backend.NCCL

可以直接调用此类来解析字符串,例如Backend(backend_str)会检查backend_str是否有效,并在有效的情况下返回解析后的小写字符串。它也接受大写的字符串输入,例如Backend("GLOO") 返回 "gloo"

注意

条目 Backend.UNDEFINED 存在,但仅作为某些字段的初始值使用。用户不应直接使用它,也不应依赖它的存在。

classmethodregister_backend(name, func, extended_api=False, devices=None)[源代码]

使用给定的名称和实例化函数来注册一个新的后端。

此方法被第三方的 ProcessGroup 扩展用来注册新的后端。

参数
  • name (str) – ProcessGroup 扩展的后端名称,应与 init_process_group() 中的一致。

  • func (函数)– 用于实例化后端的函数处理器。该函数应在后端扩展中实现,并接受四个参数:storerankworld_sizetimeout

  • extended_api (bool, 可选) – 指定后端是否支持扩展的参数结构。默认值为False。如果设置为True,后端将接收一个 实例,并根据后端的具体实现获取进程组选项对象。

  • device (strlist of str, 可选) – 后端支持的设备类型,例如“cpu”或“cuda”。如果为None,则默认同时支持“cpu”和“cuda”。

注意

对第三方后端的支持处于试验阶段,可能随时发生变化。
torch.distributed.get_backend(group=None)[源代码]

返回指定进程组的后端。

参数

group (ProcessGroup, 可选) - 要操作的进程组。默认为通用的主要进程组。如果指定了其他特定组,则调用该进程必须是 group 的一部分。

返回值

给定进程组的后端以小写字符串的形式表示。

返回类型

后端

torch.distributed.get_rank(group=None)[源代码]

返回当前进程在提供的group中的排名,默认情况下返回默认值。

排名是分配给分布式进程组中每个进程的唯一标识符,范围从0到world_size的连续整数。

参数

group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

返回值

如果进程不在该组中,则进程组的排名为-1

返回类型

int

torch.distributed.get_world_size(group=None)[源代码]

返回当前进程组中进程的数量。

参数

group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

返回值

如果不是进程组的一部分,则世界大小为-1;如果是,则为该进程组的世界大小。

返回类型

int

关闭

在退出时,通过调用destroy_process_group()来清理资源是非常重要的。

要遵循的最简单模式是在训练脚本中不再需要通信的地方(通常在主函数末尾附近)调用 destroy_process_group(),并将 group 参数设为默认值 None,以销毁每个进程组和后端。此调用应由每个 trainer 进程单独执行,而不是在外层进程启动器级别。

如果在超时时间内,pg 中的所有进程都没有调用 destroy_process_group(),特别是在应用程序中有多个进程组(例如用于 N 维并行性)的情况下,在退出时可能会出现挂起的情况。这是因为 ProcessGroupNCCL 的析构函数会调用 ncclCommAbort,而后者必须集体调用。然而,如果由 Python 的垃圾回收器来调用 ProcessGroupNCCL 的析构函数,则其调用顺序是不确定的。通过调用 destroy_process_group() 可以确保在所有进程中一致地调用 ncclCommAbort,并且避免在 ProcessGroupNCCL 的析构函数中调用 ncclCommAbort。

重新初始化

destroy_process_group 还可以用于销毁单独的进程组。例如,在容错训练中,进程组可能在运行时被销毁并重新初始化一个新的进程组。在这种情况下,调用 destroy 后且在后续初始化之前,使用除 torch.distributed 基本原语之外的方法同步训练器进程至关重要。由于实现此同步较为困难,目前尚不支持/未测试这种行为,并将其视为已知问题。如果这种情况阻碍了你的工作,请提交一个 github 问题或 RFC。


分布式键值存储

分布式包自带了一个分布式键值存储,可以用于在组内的进程之间共享信息,并且可以通过torch.distributed.init_process_group() 初始化分布式包(通过显式创建存储作为指定init_method的替代方案)。键值存储有三种选择: TCPStoreFileStoreHashStore

torch.distributed.Store

所有存储实现的基类,例如 PyTorch 分布式提供的三种:TCPStoreFileStoreHashStore

torch.distributed.TCPStore

基于 TCP 的分布式键值存储实现。服务器端负责存储数据,而客户端可以通过 TCP 连接到服务器,并执行诸如 set()(插入键值对)和 get()(检索键值对)等操作。始终需要初始化一个服务器实例,因为客户端会等待与服务器建立连接。

参数
  • host_name (str) – 服务器存储应该运行的主机名或IP地址。

  • port (int) – 服务器应监听的传入请求的端口号。

  • world_size (int, 可选) – 存储系统的总用户数(客户端数量加1作为服务器)。默认值为 None,表示用户数量不固定。

  • is_master (bool, optional) – 当初始化服务器存储时为 True,客户端存储时为 False。默认值为 False。

  • timeout (timedelta, 可选) – 在初始化和调用方法如get()wait() 时使用的超时时间。默认值为 timedelta(seconds=300)

  • wait_for_workers (bool, 可选) – 是否等待所有工作者与服务器存储建立连接。此选项仅在 world_size 为固定值时有效。默认值为 True。

  • multi_tenant (bool, optional) – 如果设置为 True,则当前进程中所有具有相同主机和端口的 TCPStore 实例将共享同一个底层 TCPServer。默认值为 False。

  • master_listen_fd (int, 可选) – 如果指定了该参数,底层的 TCPServer 将会监听这个文件描述符。此描述符必须是一个已经绑定到指定端口的套接字。在某些场景中避免端口分配竞争时很有用。默认值为 None(意味着服务器将创建一个新的套接字并尝试将其绑定到指定端口)。

  • use_libuv (bool, 可选) – 如果为 True,则使用 libuv 作为 TCPServer 的后端。默认值为 True。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Run on process 1 (server)
>>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30))
>>> # Run on process 2 (client)
>>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False)
>>> # Use any of the store methods from either the client or server after initialization
>>> server_store.set("first_key", "first_value")
>>> client_store.get("first_key")
torch.distributed.HashStore

一种基于底层哈希表的线程安全存储实现。该存储可以在同一进程中被其他线程使用,但不支持跨进程使用。

示例:
>>> import torch.distributed as dist
>>> store = dist.HashStore()
>>> # store can be used from other threads
>>> # Use any of the store methods after initialization
>>> store.set("first_key", "first_value")
torch.distributed.FileStore

一种使用文件来存储底层键值对的商店实现。

参数
  • file_name (str) – 指定存储键值对的文件路径

  • world_size (int, 可选) – 表示使用该存储的总进程数。默认值为 -1(负值表示存储用户数量未固定)。

示例:
>>> import torch.distributed as dist
>>> store1 = dist.FileStore("/tmp/filestore", 2)
>>> store2 = dist.FileStore("/tmp/filestore", 2)
>>> # Use any of the store methods from either the client or server after initialization
>>> store1.set("first_key", "first_value")
>>> store2.get("first_key")
torch.distributed.PrefixStore

这是一个围绕任意一个键值存储(TCPStoreFileStoreHashStore)的包装器,它在将键插入存储时为每个键添加前缀。

参数
  • prefix (str) – 在将每个键插入存储之前,添加到键前面的前缀字符串。

  • store (torch.distributed.store) – 表示底层键值存储的一个对象。

torch.distributed.Store.set(self:torch._C._distributed_c10d.Store, arg0:str, arg1:str)None

根据提供的 keyvalue 将键值对插入到存储中。如果 key 已经存在于存储中,它将会被新的 value 替换。

参数
  • key (str) – 需要添加到存储中的键。

  • value (str) – 与key关联并添加到存储中的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
torch.distributed.Store.get(self:torch._C._distributed_c10d.Store, arg0:str)bytes

检索与给定 key 关联的值。如果 key 不存在于存储中,该函数将在初始化存储时定义的 timeout 时间内等待,然后抛出异常。

参数

key (str) – 该函数将返回与该键相关的值。

返回值

如果 key 存在于存储中,那么它关联的值就是这里的重点。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
torch.distributed.Store.add(self:torch._C._distributed_c10d.Store, arg0:str, arg1:int)int

对于给定的key,第一次调用add会在存储中创建一个与该key关联的计数器,并将其初始化为amount。后续使用相同的key再次调用add会将计数器增加指定的amount值。如果已经通过set()在存储中设置了某个键,再对该键调用add()将会抛出异常。

参数
  • key (str) – 指定在存储中需要递增计数器的键。

  • amount (int) – 计数器将被增加的数值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> store.add("first_key", 6)
>>> # Should return 7
>>> store.get("first_key")
torch.distributed.Store.compare_set(self:torch._C._distributed_c10d.Store, arg0:str, arg1:str, arg2:str)bytes

根据提供的key将键值对插入到存储中,并在插入前比较expected_valuedesired_value。只有当key对应的expected_value已经存在于存储中,或者expected_value是空字符串时,才会设置desired_value

参数
  • key (str) – 在存储中需要检查的键。

  • expected_value (str) – 插入前需要检查的与key关联的值。

  • desired_value (str) – 要添加到存储中的与key相关的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("key", "first_value")
>>> store.compare_set("key", "first_value", "second_value")
>>> # Should return "second_value"
>>> store.get("key")
torch.distributed.Store.wait(*args, **kwargs)

overloaded 函数。

不过,如果严格按照要求只调整为更自然的表达且不增加额外内容的话:

重载的函数。

  1. wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None

等待 keys 中的每个键被添加到存储中。如果在 timeout(在存储初始化时设置)之前并非所有键都已设置,wait 将抛出异常。

参数

keys (列表) – 等待直到在存储中设置的键的列表。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 30 seconds
>>> store.wait(["bad_key"])
  1. wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None

等待keys中的每个键被添加到存储中,如果在提供的timeout时间内键仍未设置,则抛出异常。

参数
  • keys (列表) – 等待直到在存储中设置的键的列表。

  • timeout (timedelta) – 等待密钥添加的超时时间,在此时间内如果没有添加密钥则抛出异常。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"], timedelta(seconds=10))
torch.distributed.Store.num_keys(self:torch._C._distributed_c10d.Store) int

返回存储中设置的键的数量。请注意,这个数量通常会比通过 set()add() 方法添加的键的数量多一个,因为还有一个额外的键用于协调所有使用该存储的工作者。

警告

当与TCPStore一起使用时,num_keys 返回写入底层文件的键的数量。如果存储被销毁,并且使用相同的文件创建了另一个存储,则原始键将保留下来。

返回值

存储中键的数量。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # This should return 2
>>> store.num_keys()
torch.distributed.Store.delete_key(self:torch._C._distributed_c10d.Store, arg0:str)bool

删除与key关联的键值对。如果删除成功,返回true;如果没有成功,返回false

警告

delete_key API 只被TCPStoreHashStore 支持。使用此 API 与 FileStore 将会导致异常。

参数

key (str) – 从存储中删除的键

返回值

如果 key 被删除,则返回True,否则返回False

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, HashStore can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key")
>>> # This should return true
>>> store.delete_key("first_key")
>>> # This should return false
>>> store.delete_key("bad_key")
torch.distributed.Store.set_timeout(self:torch._C._distributed_c10d.Store, timeout:datetime.timedelta) None

设置商店的默认超时时间。该超时时间用于初始化过程以及 wait()get() 方法中。

参数

timeout (timedelta) – 存储中的超时时间。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set_timeout(timedelta(seconds=10))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"])

默认情况下,集合操作在默认组(也称为“世界”)上运行,并要求所有进程参与分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这时,分布式组就派上了用场。new_group() 函数可以用来创建新的组,这些组包含所有进程中的任意子集。它返回一个不透明的组句柄,该句柄可以用作所有集合操作(集合是用于在某些已知编程模式中交换信息的分布式函数)的 group 参数。

torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None)[源代码]

创建一个全新的分布式小组。

此函数要求主组中的所有进程(即分布式作业中涉及的所有进程)都调用该函数,即使它们最终不会成为组的一部分。此外,应在所有进程中按相同的顺序创建这些组。

警告

安全的并发使用:当使用多个进程组和NCCL后端时,用户必须确保所有排名之间的集合操作有全局一致的执行顺序。

如果进程内的多个线程执行集合操作,需要进行显式同步以确保顺序的一致性。

当使用 torch.distributed 通信 API 的异步变体时,会返回一个工作对象,并将通信内核排队到单独的 CUDA 流中,从而允许通信和计算重叠。一旦在一个进程组上发布了其中一个或多个异步操作,则必须在调用 work.wait() 同步它们与其他 CUDA 流之后,才能使用另一个进程组。

更多详情请参阅 同时使用多个 NCCL 通信器

参数
  • ranks (list[int]) – 成员等级的列表。如果为 None,则表示包含所有等级。默认值是 None

  • timeout (timedelta, 可选) – 详情和默认值请参见init_process_group

  • backend (strBackend, 可选) – 要使用的后端。根据构建时的配置,有效值为gloonccl。默认情况下使用全局组相同的后端。此字段应以小写字符串形式给出(例如,"gloo"),也可以通过Backend 属性访问(例如,Backend.GLOO)。如果传递 None,将使用默认进程组对应的后端。默认值为 None

  • pg_options (ProcessGroupOptions, 可选) – 指定在构建特定进程组时需要传递的额外选项。例如,在nccl后端中,可以设置is_high_priority_stream以使进程组使用高优先级的CUDA流。有关配置nccl的其他可用选项,请参见NVIDIA文档

  • use_local_synchronization (bool, 可选) – 在进程组创建结束时执行本地屏障。与之不同的是,非成员等级不需要调用 API 也不需要加入屏障。

  • group_desc (str, 可选) – 描述进程组的字符串。

返回值

一个可以分配给集体调用的分布式组句柄,如果没有在ranks中的排名,则使用

注意:use_local_synchronization 不能与 MPI 一起使用。

注意:虽然在较大的集群和较小的进程组中设置 use_local_synchronization=True 可以显著提高速度,但需要注意的是,这会改变集群行为,因为非成员 rank 不参与组屏障(group barrier())。

注意:使用 use_local_synchronization=True 可能会导致死锁,尤其是在每个进程排名创建多个重叠的过程组时。为了避免这种情况,请确保所有进程排名按照相同的全局顺序进行创建。

torch.distributed.get_group_rank(group, global_rank)[源代码]

将全球排名转换为小组排名。

如果 global_rank 不是 group 的一部分,则会引发 RuntimeError。

参数
  • group (ProcessGroup) – 用于查找相对排名的 ProcessGroup。

  • global_rank (int) — 需要查询的全局排名。

返回值

相对于 groupglobal_rank 组内排名

返回类型

int

注意:在默认进程组中调用此函数会返回身份元素。

torch.distributed.get_global_rank(group, group_rank)[源代码]

将小组排名转化为全球排名。

如果 group_rank 不是 的一部分,则会引发 RuntimeError。

参数
  • group (ProcessGroup) – 查找全局排名的 ProcessGroup。

  • group_rank (int) – 要查询的组的排名。

返回值

group_rank 相对于 group 的全球排名

返回类型

int

注意:在默认进程组中调用此函数会返回身份元素。

torch.distributed.get_process_group_ranks(group)[源代码]

获取与相关的所有排名。

参数

group (ProcessGroup) – 用于获取所有 ranks 的 ProcessGroup。

返回值

按照组排名顺序排列的全球排名列表。

返回类型

List

设备网格

DeviceMesh 是一个更高层次的抽象,管理进程组(或 NCCL 通信器)。它允许用户轻松地创建跨节点和同一节点内的进程组,无需担心如何为不同的子进程组正确设置排名,并且有助于轻松管理这些分布式进程组。init_device_mesh() 函数可以用于根据描述设备拓扑的网格形状创建新的 DeviceMesh。

classtorch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[源代码]

DeviceMesh 表示一个设备网格,设备的布局可以用一个 n 维数组来表示,而这个数组中的每一个值代表默认进程组等级的全局 ID。

DeviceMesh 可用于描述集群中设备的布局,并充当集群内设备列表间通信的代理。

DeviceMesh 可以用作 context 管理器。

注意

DeviceMesh 遵循 SPMD 编程模型,这意味着集群中的所有进程/排名都会运行同一个 PyTorch Python 程序。因此,用户需要确保在所有排名中 mesh 数组(描述设备布局)保持一致。如果不一致,可能会导致静默挂起。

参数
  • device_type (str) – 表示网格的设备类型。目前支持的类型有:“cpu”,“cuda/cuda-like”。

  • mesh (ndarray) – 描述设备布局的多维数组或整数张量,IDs为默认进程组的全局ID。

返回值

表示设备布局的 DeviceMesh 对象。

返回类型

DeviceMesh

以下程序以SPMD方式在每个进程/排名上运行。在这个示例中,有两个主机,每个主机有四个GPU。网格的第一维上的归约将在列(0, 4)和(3, 7)之间进行;网格的第二维上的归约将在行(0, 1, 2, 3)和(4, 5, 6, 7)之间进行。

示例:
>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])

点对点通信

torch.distributed.send(tensor, dst, group=None, tag=0)[源代码]

同步发送张量。

警告

tag 在 NCCL 后端不被支持。

参数
  • tensor (Tensor) – 需要发送的张量。

  • dst (int) – 目标进程在全球进程组中的编号(与group参数无关)。目标进程的编号不应与当前进程的编号相同。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • tag (int, 可选) – 匹配远程接收的发送标签

torch.distributed.recv(tensor, src=None, group=None, tag=0)[源代码]

同步接收张量数据。

警告

tag 在 NCCL 后端不被支持。

参数
  • tensor (Tensor) - 需要填充接收到的数据的目标张量。

  • src (int, 可选) – 全局进程组中的源进程排名。如果未指定,则从任何进程接收数据。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • tag (int, 可选) – 用于匹配本地接收和远程发送的标签

返回值

如果发送者不是组的一部分,则其排名为-1

返回类型

int

isend()irecv() 在使用时会返回分布式请求对象。这些对象的类型通常未指定,因为它们不会被手动创建,但它们保证支持两种方法:

  • is_completed() - 如果操作已经完成,返回 True

  • wait() 会阻塞进程,直到操作完成。一旦 wait() 返回,is_completed() 就会被保证返回 True。

torch.distributed.isend(tensor, dst, group=None, tag=0)[源代码]

异步发送张量。

警告

在请求完成之前修改tensor会导致未定义的行为。

警告

tag 在 NCCL 后端不被支持。

参数
  • tensor (Tensor) – 需要发送的张量。

  • dst (int) – 目标在全球进程组中的秩(与 group 参数无关)

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • tag (int, 可选) – 匹配远程接收的发送标签

返回值

一个分布式请求对象。如果不属于该组,则为None

返回类型

Optional[Work]

torch.distributed.irecv(tensor, src=None, group=None, tag=0)[源代码]

异步接收张量数据。

警告

tag 在 NCCL 后端不被支持。

参数
  • tensor (Tensor) - 需要填充接收到的数据的目标张量。

  • src (int, 可选) – 全局进程组中的源进程排名。如果未指定,则从任何进程接收数据。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • tag (int, 可选) – 用于匹配本地接收和远程发送的标签

返回值

一个分布式请求对象。如果不属于该组,则为None

返回类型

Optional[Work]

torch.distributed.send_object_list(object_list, dst, group=None, device=None)[源代码]

同步发送object_list中包含的可pickle对象。

类似于send(),但可以传递 Python 对象。需要注意的是,object_list 中的所有对象都必须是可以序列化的(picklable)才能被发送。

参数
  • object_list (List[Any]) – 输入对象的列表,要求每个对象都是可序列化的,并且接收方提供的列表大小必须与发送的一致。

  • dst (int) – 目标 rank,将 object_list 发送到该目标 rank。目标 rank 基于全局进程组(与 group 参数无关)

  • group – (ProcessGroup,可选):指定要操作的进程组。如果不指定,默认会使用系统默认的进程组。默认值为None

  • device (torch.device, 可选) – 如果不为 None,对象会被序列化并转换成张量,在发送前移动到指定的 device。默认值是 None

返回值

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信之前移动到 GPU 设备。设备由 torch.cuda.current_device() 给出,并且用户负责通过 torch.cuda.set_device() 为每个进程设置单独的 GPU。

警告

send_object_list() 隐式地使用了不安全的 pickle 模块。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。因此,仅在完全信任数据的情况下调用此函数。

警告

使用send_object_list()函数传递GPU张量的支持不足且效率低下,因为这会引发从GPU到CPU的数据传输(在序列化过程中)。请考虑改用send()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.recv_object_list(object_list, src=None, group=None, device=None)[源代码]

同步接收 object_list 中的可pickle对象。

类似于recv(),但可以接收 Python 对象。

参数
  • object_list (List[Any]) – 需要接收的对象列表。该列表的大小必须与发送的列表相同。

  • src (int, optional) – 源 rank,用于发送 object_list。源 rank 基于全局进程组(与 group 参数无关)。如果设置为 None,则从任何 rank 接收,默认值是 None

  • group – (ProcessGroup,可选):指定要操作的进程组。如果不指定,默认会使用系统默认的进程组。默认值为None

  • device (torch.device, 可选) – 如果不为 None,则在此设备上接收数据。默认值是 None

返回值

发送者的排名。如果该排名不在组内,则值为-1。若排名在组内,object_list 将包含从 src 发送的对象。

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信之前移动到 GPU 设备。设备由 torch.cuda.current_device() 给出,并且用户负责通过 torch.cuda.set_device() 为每个进程设置单独的 GPU。

警告

recv_object_list() 隐式使用了不安全的 pickle 模块。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。因此,仅在完全信任数据的情况下调用此函数。

警告

使用recv_object_list() 与 GPU 张量调用时支持度不高且效率低下,因为这会引发 GPU 到 CPU 的数据传输,而张量会被序列化。请考虑改用 recv()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.batch_isend_irecv(p2p_op_list)[源代码]

异步发送或接收一批张量,并返回请求列表。

处理p2p_op_list中每个操作,并返回相应的请求。目前支持的后端包括 NCCL、Gloo 和 UCC。

参数

p2p_op_list – 一个点对点操作的列表(每个操作类型为 torch.distributed.P2POp)。列表中 isend 和 irecv 的顺序很重要,需要与远程端对应的 isend 和 irecv 匹配。

返回值

通过调用 op_list 中相应操作所返回的分布式请求对象列表。

示例

>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank
>>> recv_tensor = torch.randn(2, dtype=torch.float32)
>>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size)
>>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size)
>>> reqs = batch_isend_irecv([send_op, recv_op])
>>> for req in reqs:
>>>     req.wait()
>>> recv_tensor
tensor([2, 3])     # Rank 0
tensor([0, 1])     # Rank 1

注意

请注意,当此API与NCCL PG后端一起使用时,用户必须使用torch.cuda.set_device来设置当前GPU设备,否则可能会导致意外的程序挂起问题。

此外,如果这是传递给 dist.P2POpgroup 中的第一个集体调用,则 group 中的所有 rank 必须参与此 API 调用;否则行为未定义。如果这不是 group 中的第一个集体调用,则允许仅涉及部分 rank 的批处理 P2P 操作。

torch.distributed.P2POp(op, tensor, peer, group=None, tag=0)[源代码]

用于构建点对点操作的类,适用于batch_isend_irecv

此类用于构建点对点操作的类型、通信缓冲区、对等体排名、进程组和标签。此类的实例将被传递给 batch_isend_irecv 以实现点对点通信。

参数
  • op (Callable) – 用于向对等进程发送或接收数据的函数。其类型可以是 torch.distributed.isendtorch.distributed.irecv

  • tensor (Tensor) — 需要发送或接收的张量。

  • peer (int) – 目标或源的秩。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • tag (int, 可选) – 匹配发送和接收的数据标签。

同步与异步集合操作

每个集体操作函数根据传递给集体操作的async_op标志的不同设置,支持以下两种操作:

同步操作 - 默认模式,在async_op 设置为 False 时生效。当函数返回时,可以保证集合操作已经完成。但是,对于 CUDA 操作,并不能确保其已完成,因为 CUDA 操作是异步的。在 CPU 集体操作中,任何进一步使用集体调用输出的功能调用将按预期工作。而在 CUDA 集体操作中,只有在同一 CUDA 流上的功能调用才会按预期行为工作。用户需要注意,在不同流运行时需要进行同步处理。有关流同步等 CUDA 语义的详细信息,请参阅 CUDA 语义。请查看下面的脚本,以了解 CPU 和 CUDA 操作在这些语义上的差异。

异步操作 - 当 async_op 设置为 True 时,集体操作函数会返回一个分布式请求对象。通常情况下,你无需手动创建该对象,并且它保证支持以下两种方法:

  • is_completed() - 在CPU集体操作的情况下,如果已完成则返回True。在CUDA操作的情况下,如果操作已成功加入到一个CUDA流中,并且输出可以在默认流中使用而无需进一步同步,则也返回True

  • wait() - 在CPU集体操作情况下,会阻塞进程直至操作完成;而在CUDA集体操作情况下,则会阻塞直至操作成功添加到一个CUDA流中,并且输出可以在默认流中直接使用,无需额外同步。

  • get_future() - 返回 torch._C.Future 对象。支持 NCCL,并且在 GLOO 和 MPI 上也支持大多数操作(点对点操作除外)。注意:随着我们继续采用 Futures 并合并 API,get_future() 调用可能会变得不再必要。

示例

以下代码可作为使用分布式集合时CUDA操作语义的参考。它显示了在不同的CUDA流中使用集合输出时需要进行显式同步。

# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
    s.wait_stream(torch.cuda.default_stream())
    output.add_(100)
if rank == 0:
    # if the explicit call to wait_stream was omitted, the output below will be
    # non-deterministically 1 or 101, depending on whether the allreduce overwrote
    # the value after the add completed.
    print(output)

集合函数

torch.distributed.broadcast(tensor, src, group=None, async_op=False)[源代码]

将张量发送到整个组。

tensor 在所有参与集体操作的进程中必须拥有相同的元素数量。

参数
  • tensor (Tensor) – 如果 src 是当前进程的排名,则发送数据;否则,使用该张量来保存接收到的数据。

  • src (int) – 全局进程组中的源排名(与 group 参数无关)。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

torch.distributed.broadcast_object_list(object_list, src=0, group=None, device=None)[源代码]

object_list中的可pickle对象广播到整个组。

类似于broadcast(),但可以传入 Python 对象。需要注意的是,object_list 中的所有对象都必须是可以被序列化的,才能进行广播。

参数
  • object_list (List[Any]) – 输入对象的列表,这些对象需要进行广播。每个对象必须是可以序列化的。只有位于src rank 的对象会被广播,但每个 rank 必须提供相同大小的列表。

  • src (int) – 指定广播 object_list 的源 rank。该源 rank 基于全局进程组,不受 group 参数的影响。

  • group – (ProcessGroup,可选):指定要操作的进程组。如果不指定,默认会使用系统默认的进程组。默认值为None

  • device (torch.device, 可选) – 如果不为 None,对象会被序列化并转换成张量,在广播之前移动到指定的 device。默认值为 None

返回值

None。如果 rank 属于该组,object_list 将包含从 src rank 广播的对象。

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信之前移动到 GPU 设备。设备由 torch.cuda.current_device() 给出,并且用户负责通过 torch.cuda.set_device() 为每个进程设置单独的 GPU。

注意

请注意,此 API 与broadcast() 集体操作略有不同,因为它不提供 async_op 句柄,因此会是阻塞调用。

警告

broadcast_object_list() 隐式使用了不安全的 pickle 模块。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。因此,仅在完全信任数据的情况下调用此函数。

警告

调用broadcast_object_list() 与 GPU 张量一起使用时支持度不高且效率低下,因为这会导致 GPU 到 CPU 的数据传输(张量会被序列化)。请考虑改用 broadcast()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     objects = [None, None, None]
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]

以使所有机器都得到最终结果的方式减少张量数据。

在调用tensor之后,所有进程中的该张量将比特对比特相同。

支持复杂的张量。

参数
  • tensor (Tensor) - 集合操作的输入和输出。该函数会对数据进行原地操作。

  • op (可选) – 指定来自 torch.distributed.ReduceOp 枚举的值,用于定义逐元素归约操作。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

示例

>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
torch.distributed.reduce(tensor, dst, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]

在所有机器上减少张量数据。

只有排名为 dst 的进程将会接收到最终结果。

参数
  • tensor (Tensor) - 集合操作的输入和输出。该函数会对数据进行原地操作。

  • dst (int) – 目标在全球进程组中的秩(与 group 参数无关)

  • op (可选) – 指定来自 torch.distributed.ReduceOp 枚举的值,用于定义逐元素归约操作。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[源代码]

将整个组中的张量汇总到一个列表中。

支持复杂和大小不一的张量。

参数
  • tensor_list (list[Tensor]) – 输出列表。该列表应包含用于集体输出的正确大小的张量。支持不同大小的张量。

  • tensor (Tensor) – 当前进程中需要广播的张量。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

示例

>>> # All tensors below are of torch.int64 dtype.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0
[tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype.
>>> # We have 2 process groups, 2 ranks.
>>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)]
>>> tensor_list
[tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0
[tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0
[tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[源代码]

从所有设备收集张量,并将它们合并到一个输出张量中。

此函数要求每个进程中的所有张量具有相同的大小。

参数
  • output_tensor (Tensor) – 用于容纳来自所有排名输入张量元素的输出张量。它必须按照以下一种形式正确设置:(i) 沿主要维度将所有输入张量连接起来;关于“连接”的定义,请参见torch.cat(); (ii) 沿主要维度堆叠所有输入张量;关于“堆叠”的定义,请参见torch.stack()。下面的示例将更好地解释支持的输出形式。

  • input_tensor (Tensor) – 从当前 rank 收集的张量。与 all_gather API 不同,此 API 中的输入张量在所有 ranks 上必须具有相同的大小。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor_in
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> # Output in concatenation form
>>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([1, 2, 3, 4], device='cuda:0') # Rank 0
tensor([1, 2, 3, 4], device='cuda:1') # Rank 1
>>> # Output in stack form
>>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out2, tensor_in)
>>> tensor_out2
tensor([[1, 2],
        [3, 4]], device='cuda:0') # Rank 0
tensor([[1, 2],
        [3, 4]], device='cuda:1') # Rank 1

警告

Gloo 后端不支持该 API。

torch.distributed.all_gather_object(object_list, obj, group=None)[源代码]

将整个组中的可pickle对象收集到一个列表中。

类似于all_gather(),但是可以传递 Python 对象。需要注意的是,这些对象必须是可pickle的,才能够被收集。

参数
  • object_list (list[Any]) – 输出列表。它的大小应与该集体操作的组大小一致,并包含最终结果。

  • obj (Any) – 可序列化的 Python 对象,将从当前进程进行广播。

  • group (ProcessGroup, 可选) – 操作的目标进程组。若未指定,则使用默认进程组。默认值为 None

返回值

如果没有影响,表示调用的rank属于该组时,集合操作的结果会填充到输入的object_list中。如果调用的rank不属于该组,则传入的object_list将保持不变。

注意

请注意,此 API 与all_gather() 集体操作略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。

注意

对于基于NCCL的处理组,在通信发生之前,对象的内部张量表示必须移动到GPU设备。在这种情况下,使用的设备由torch.cuda.current_device()给出,并且用户负责通过torch.cuda.set_device()为每个进程设置单独的GPU。

警告

all_gather_object() 隐式地使用了不安全的 pickle 模块。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。因此,仅在信任数据的情况下调用此函数。

警告

使用 all_gather_object() 与 GPU 张量一起时支持度不高且效率低下,因为它会触发 GPU 到 CPU 的数据传输,因为张量会被序列化。建议改用 all_gather()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]
torch.distributed.gather(tensor, gather_list=None, dst=0, group=None, async_op=False)[源代码]

在一个进程中汇总张量列表。

此函数要求每个进程中的所有张量具有相同的大小。

参数
  • tensor (Tensor) - 输入的张量。

  • gather_list (list[Tensor], 可选) – 用于收集数据的适当大小相同的张量列表(默认为 None,必须在目标 rank 上指定)

  • dst (int, 可选) – 目标全局进程组中的目标排名(与group参数无关)。默认值为0。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

torch.distributed.gather_object(obj, object_gather_list=None, dst=0, group=None)[源代码]

在一个进程中从整个组中收集可pickle的对象。

类似于gather(),但可以传入Python对象。需要注意的是,对象必须是可以序列化的,才能被收集。

参数
  • obj (Any) – 输入对象。该对象必须是可以被序列化的。

  • object_gather_list (list[Any]) – 输出列表。在dst rank上,它的大小应与该集体操作的组大小一致,并包含输出结果。非dst rank上的值必须为None。(默认值是None

  • dst (int, 可选) – 目标全局进程组中的目标排名(与group参数无关)。默认值为0。

  • group – (ProcessGroup,可选):指定要操作的进程组。如果不指定,默认会使用系统默认的进程组。默认值为None

返回值

无。在dst排名上,object_gather_list将包含集体操作的结果。

注意

请注意,此 API 与 gather 集体操作有所不同,因为它没有提供 async_op 句柄,因此会是一个阻塞调用。

注意

对于基于NCCL的处理组,在通信发生之前,对象的内部张量表示必须移动到GPU设备。在这种情况下,使用的设备由torch.cuda.current_device()给出,并且用户负责通过torch.cuda.set_device()为每个进程设置单独的GPU。

警告

gather_object() 隐式地使用了不安全的 pickle 模块。可以构造恶意的 pickle 数据,在反序列化期间执行任意代码。仅在信任数据的情况下调用此函数。

警告

调用gather_object() 与 GPU 张量一起使用时支持度不高且效率低下,因为这会导致 GPU 到 CPU 的数据传输(张量会被序列化)。请考虑改用gather()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
...     gather_objects[dist.get_rank()],
...     output if dist.get_rank() == 0 else None,
...     dst=0
... )
>>> # On rank 0
>>> output
['foo', 12, {1: 2}]
torch.distributed.scatter(tensor, scatter_list=None, src=0, group=None, async_op=False)[源代码]

将张量列表分散到组中所有进程。

每个进程将接收到一个张量,并将其数据存储在 tensor 参数中。

支持复杂的张量。

参数
  • tensor (Tensor) - 输出的张量。

  • scatter_list (list[Tensor]) – 需要散列的张量列表(默认为 None,在源 rank 上必须指定)

  • src (int) – 源的全局进程组中的排名。默认值为 0。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

注意

请注意,scatter_list 中的所有张量必须大小相同。

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> tensor_size = 2
>>> t_ones = torch.ones(tensor_size)
>>> t_fives = torch.ones(tensor_size) * 5
>>> output_tensor = torch.zeros(tensor_size)
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     # Only tensors, all of which must be the same size.
>>>     scatter_list = [t_ones, t_fives]
>>> else:
>>>     scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # Rank i gets scatter_list[i]. For example, on rank 1:
>>> output_tensor
tensor([5., 5.])
torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list, src=0, group=None)[源代码]

scatter_object_input_list中的可pickle对象分发到整个组。

类似于scatter(),但可以传递 Python 对象。在每个进程中,散射的对象将被存储为scatter_object_output_list的第一个元素。需要注意的是,scatter_object_input_list中的所有对象都必须是可 picklable 的。

参数
  • scatter_object_output_list (List[Any]) – 一个非空列表,其中的第一个元素用于存储散射到当前 rank 的对象。

  • scatter_object_input_list (List[Any]) – 输入对象的列表,这些对象将被散列处理。每个对象必须是可以序列化的(picklable)。只有在 src 秩上的对象会被散列,而非 src 秩上的参数可以为 None

  • src (int) – 指定散射scatter_object_input_list的源 rank。该源 rank 基于全局进程组,与参数group无关。

  • group – (ProcessGroup,可选):指定要操作的进程组。如果不指定,默认会使用系统默认的进程组。默认值为None

返回值

None。如果 rank 属于该组,scatter_object_output_list 的第一个元素将被设置为当前 rank 的散列对象。

注意

请注意,此 API 与 scatter 集体操作有所不同,因为它没有提供 async_op 句柄,因此会是一个阻塞调用。

警告

scatter_object_list() 隐式使用了不安全的 pickle 模块。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。因此,仅在完全信任数据的情况下调用此函数。

警告

调用scatter_object_list() 与 GPU 张量一起使用时支持度不高且效率低下,因为这会导致张量从 GPU 到 CPU 的传输(由于序列化)。请考虑改用scatter()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     # Can be any list on non-src ranks, elements are not used.
>>>     objects = [None, None, None]
>>> output_list = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # Rank i gets objects[i]. For example, on rank 2:
>>> output_list
[{1: 2}]
torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]

将张量列表进行缩减,然后分散到组内所有进程。

参数
  • output (Tensor) – 输出的张量。

  • input_list (list[Tensor]) – 需要减少并散列的张量列表。

  • op (可选) – 指定来自 torch.distributed.ReduceOp 枚举的值,用于定义逐元素归约操作。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作。

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]

先减少再将张量分散到组中所有的排名。

参数
  • output (Tensor) – 输出张量。它在所有设备上应具有相同大小。

  • input (Tensor) – 输入张量,将被缩减并分散。其大小应该是输出张量的大小乘以世界大小。输入张量可以具有以下形状之一:(i) 沿主要维度拼接的多个输出张量,或 (ii) 沿主要维度堆叠的多个输出张 tensor。关于“拼接”的定义,请参见 torch.cat()。关于“堆叠”的定义,请参见 torch.stack()

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作。

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device)
>>> # Input in concatenation form
>>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device)
>>> tensor_in
tensor([0, 1, 2, 3], device='cuda:0') # Rank 0
tensor([0, 1, 2, 3], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # Input in stack form
>>> tensor_in = torch.reshape(tensor_in, (world_size, 2))
>>> tensor_in
tensor([[0, 1],
        [2, 3]], device='cuda:0') # Rank 0
tensor([[0, 1],
        [2, 3]], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1

警告

Gloo 后端不支持该 API。

torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[源代码]

拆分输入张量,然后将其分散到组中所有进程。

随后,从组内所有进程中接收的张量会被拼接在一起,并作为单个输出张量返回。

支持复杂的张量。

参数
  • output (Tensor) – 汇聚后的拼接输出张量。

  • input (Tensor) – 需要进行scatter操作的输入张量。

  • output_split_sizes – (list[Int], 可选): 如果未指定值或为空,output 张量的 dim 0 必须能被 world_size 整除。如果指定了输出分割大小,则按照指定尺寸在 dim 0 上对 output 张量进行分割。

  • input_split_sizes – (list[Int], 可选): 如果未指定值,则 input 张量的 dim 0 必须能够被 world_size 均匀整除。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作。

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

警告

all_to_all_single 是试验性的,可能随时发生变化。

示例

>>> input = torch.arange(4) + rank * 4
>>> input
tensor([0, 1, 2, 3])     # Rank 0
tensor([4, 5, 6, 7])     # Rank 1
tensor([8, 9, 10, 11])   # Rank 2
tensor([12, 13, 14, 15]) # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([0, 4, 8, 12])    # Rank 0
tensor([1, 5, 9, 13])    # Rank 1
tensor([2, 6, 10, 14])   # Rank 2
tensor([3, 7, 11, 15])   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = list(input.chunk(world_size))
>>> gather_list  = list(output.chunk(world_size))
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> output = ...
>>> dist.all_to_all_single(output, input, output_splits, input_splits)
>>> output
tensor([ 0,  1, 10, 11, 12, 20, 21, 30, 31])                     # Rank 0
tensor([ 2,  3, 13, 14, 22, 32, 33])                             # Rank 1
tensor([ 4, 15, 16, 23, 34, 35])                                 # Rank 2
tensor([ 5, 17, 18, 24, 36])                                     # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
>>> input
tensor([1+1j, 2+2j, 3+3j, 4+4j])                                # Rank 0
tensor([5+5j, 6+6j, 7+7j, 8+8j])                                # Rank 1
tensor([9+9j, 10+10j, 11+11j, 12+12j])                          # Rank 2
tensor([13+13j, 14+14j, 15+15j, 16+16j])                        # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([1+1j, 5+5j, 9+9j, 13+13j])                              # Rank 0
tensor([2+2j, 6+6j, 10+10j, 14+14j])                            # Rank 1
tensor([3+3j, 7+7j, 11+11j, 15+15j])                            # Rank 2
tensor([4+4j, 8+8j, 12+12j, 16+16j])                            # Rank 3
torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[源代码]

将输入张量列表分散到组中的所有进程,并在输出列表中返回汇集的张量列表。

支持复杂的张量。

参数
  • output_tensor_list (list[Tensor]) – 每个 rank 对应一个张量的张量列表,需要进行收集。

  • input_tensor_list (list[Tensor]) – 每个排名上要分散的张量列表。

  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作。

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

警告

all_to_all 是试验性的,可能随时发生变化。

示例

>>> input = torch.arange(4) + rank * 4
>>> input = list(input.chunk(4))
>>> input
[tensor([0]), tensor([1]), tensor([2]), tensor([3])]     # Rank 0
[tensor([4]), tensor([5]), tensor([6]), tensor([7])]     # Rank 1
[tensor([8]), tensor([9]), tensor([10]), tensor([11])]   # Rank 2
[tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([0]), tensor([4]), tensor([8]), tensor([12])]    # Rank 0
[tensor([1]), tensor([5]), tensor([9]), tensor([13])]    # Rank 1
[tensor([2]), tensor([6]), tensor([10]), tensor([14])]   # Rank 2
[tensor([3]), tensor([7]), tensor([11]), tensor([15])]   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = input
>>> gather_list  = output
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> input = list(input.split(input_splits))
>>> input
[tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])]                   # Rank 0
[tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1
[tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])]                 # Rank 2
[tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])]         # Rank 3
>>> output = ...
>>> dist.all_to_all(output, input)
>>> output
[tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])]   # Rank 0
[tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])]           # Rank 1
[tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])]              # Rank 2
[tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])]                  # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
>>> input = list(input.chunk(4))
>>> input
[tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])]            # Rank 0
[tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])]            # Rank 1
[tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])]      # Rank 2
[tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])]    # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])]          # Rank 0
[tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])]        # Rank 1
[tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])]        # Rank 2
[tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])]        # Rank 3
torch.distributed.barrier(group=None, async_op=False, device_ids=None)[源代码]

同步所有的进程。

这个集体阻塞进程,直到整个组进入该函数。如果 async_op 为 False,或者在异步工作句柄调用 wait() 方法时也会发生这种情况。

参数
  • group (ProcessGroup, 可选) - 操作的目标进程组。若未指定,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

  • device_ids ([int], 可选) – 设备或GPU的ID列表。

返回值

如果设置 async_op 为 True,则进行异步工作处理。否则(即未设置 async_op 或不属于该组),返回 None。

注意

ProcessGroupNCCL 现在使用流同步而非设备同步来阻塞 CPU。因此,请不要认为 barrier() 会进行设备同步。

torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[源代码]

类似于 torch.distributed.barrier 同步进程,但要考虑到可配置的超时时间。

它能够在提供的超时时间内报告未通过此屏障的排名。具体来说,对于非零排名,将阻塞直到从排名 0 处理了发送/接收操作;而排名 0 则会等待所有其他排名的发送和接收操作完成,并为未能及时响应的排名报告失败。需要注意的是,如果一个排名没有达到监控屏障(例如由于挂起),则所有其他排名在监控屏障中都会失败。

这个操作会阻塞组内的所有进程/排名,直到整个组成功退出函数,因此它在调试和同步方面非常有用。然而,这可能会影响性能,并且仅应在需要主机侧完全同步点的调试或特定场景中使用。为了调试目的,可以在应用程序的集体调用之前插入此屏障以检查是否有任何进程出现脱节。

注意

请注意,此集合仅与 GLOO 后端兼容。

参数
  • group (ProcessGroup, optional) – 操作的目标进程组。如果未指定(即为None),则使用默认的进程组。

  • timeout (datetime.timedelta, 可选) – 监视屏障的超时时间。如果为None,将使用默认的过程组超时设置。

  • wait_all_ranks (bool, 可选) – 是否收集所有失败的 rank。默认情况下,该值为 False,在 rank 0 上的 monitored_barrier 将会在遇到第一个失败的 rank 时抛出异常以快速失败。通过设置 wait_all_ranks=Truemonitored_barrier 将会收集所有失败的 rank 并抛出一个包含所有失败 rank 信息的错误。

返回值

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() != 1:
>>>     dist.monitored_barrier() # Raises exception indicating that
>>> # rank 1 did not call into monitored_barrier.
>>> # Example with wait_all_ranks=True
>>> if dist.get_rank() == 0:
>>>     dist.monitored_barrier(wait_all_ranks=True) # Raises exception
>>> # indicating that ranks 1, 2, ... world_size - 1 did not call into
>>> # monitored_barrier.
torch.distributed.Work

Work 对象代表了 PyTorch 分布式包中的一个待处理异步操作的句柄。这类对象通常由如 dist.all_reduce(tensor, async_op=True) 这样的非阻塞集体操作返回。

torch.distributed.ReduceOp

一个类似枚举的类,包含可用的缩减操作:SUMPRODUCTMINMAXBANDBORBXORPREMUL_SUM

BANDBORBXOR 操作在使用 NCCL 后端时不可用。

AVG 在求和之前会将值除以世界大小。此功能只在 NCCL 后端中提供,并且需要 NCCL 版本 2.10 或以上。

PREMUL_SUM 在减少之前会将输入本地乘以给定的标量。PREMUL_SUM 只在 NCCL 后端中可用,并且需要 NCCL 2.11 或更高版本。用户应该使用 torch.distributed._make_nccl_premul_sum

此外,MAXMINPRODUCT 操作不适用于复数张量。

此类的值可以作为属性访问,例如:ReduceOp.SUM。这些值用于指定减少集体操作的策略,例如:reduce()

此类不支持 __members__ 属性。

torch.distributed.reduce_op

已弃用的用于约简操作的枚举类:SUMPRODUCTMINMAX

建议使用ReduceOp

集体通信分析

请注意,你可以使用torch.profiler(推荐,仅在1.8.1及以上版本可用)或torch.autograd.profiler来对这里提到的集体通信和点对点通信API进行性能分析。所有内置后端(glooncclmpi)都受支持,并且在性能分析输出/跟踪中,集体通信的使用情况将如预期般显示。对代码进行性能分析与常规torch操作相同:

import torch
import torch.distributed as dist
with torch.profiler():
    tensor = torch.randn(20, 10)
    dist.all_reduce(tensor)

请参阅性能分析器文档,以获取有关性能分析器功能的完整概述。

多GPU集体函数

警告

多GPU功能(每个CPU线程使用多个GPU)已被弃用。目前,PyTorch 分布式系统的首选编程模型是每线程一个设备,如本文档中的API所示。如果你是一名后端开发者,并希望支持每线程多个设备,请联系 PyTorch 分布式的维护者。

第三方后端

除了内置的GLOO/MPI/NCCL后端,PyTorch分布式还支持通过运行时注册机制使用第三方后端。关于如何通过C++扩展开发第三方后端的参考,请参阅教程 - 自定义C++和CUDA扩展test/cpp_extensions/cpp_c10d_extension.cpp。第三方后端的功能取决于它们自身的实现。

新的后端从 c10d::ProcessGroup 派生,并在导入时通过调用 torch.distributed.Backend.register_backend() 注册后端名称和实例化接口。

当手动导入此后端,并在调用 torch.distributed.init_process_group() 时使用相应的后端名称,torch.distributed 包将在新后端上运行。

警告

第三方后端的支持处于试验阶段,可能随时发生变化。

启动实用程序

torch.distributed 还提供了一个启动工具 torch.distributed.launch,可以用于在每个节点上启动多个进程来进行分布式训练。

模块 torch.distributed.launch

torch.distributed.launch 是一个模块,用于在每个训练节点上启动多个分布式训练进程。

警告

此模块将被弃用,并由torchrun替代。

该工具可用于单节点分布式训练,在这种情况下,每个节点将启动一个或多个进程。它可以用于CPU训练和GPU训练。如果用于GPU训练,则每个分布式进程将在单一GPU上运行,从而显著提升单节点的训练性能。此外,它还可以通过在每个节点上启动多个进程来支持多节点分布式训练,以实现更好的多节点分布式训练性能。这对于具有直接支持GPU的多个Infiniband接口的系统尤其有益,因为所有这些接口都可以用于聚合通信带宽。

无论是单节点分布式训练还是多节点分布式训练,在这两种情况下,该工具都会在每个节点上启动指定数量的进程(--nproc-per-node)。如果用于 GPU 训练,则该数字需要小于或等于当前系统上的 GPU 数量 (nproc_per_node)。每个进程将使用从 GPU 0 到 GPU (nproc_per_node - 1) 的单个 GPU。

如何使用此模块:

  1. 单节点多进程分布式训练

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
           arguments of your training script)
  1. 多节点多进程分布式训练示例:两个节点

节点 1: (IP 地址: 192.168.1.1,空闲端口: 1234)

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

节点 2:

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
  1. 查看此模块提供的可选参数有哪些:

python -m torch.distributed.launch --help

重要通知:

1. 此工具和多进程分布式(单节点或多节点)的 GPU 训练目前只能通过 NCCL 分布式后端实现最佳性能。因此,推荐在 GPU 训练中使用 NCCL 后端。

2. 在你的训练程序中,必须解析命令行参数 --local-rank=LOCAL_PROCESS_RANK,该参数将由本模块提供。如果使用 GPU 进行训练,应确保代码仅在 LOCAL_PROCESS_RANK 对应的 GPU 设备上运行。这可以通过以下方式实现:

解析 local_rank 参数

>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()

将设备设置为本地排名,可选择以下任意一种方法

>>> torch.cuda.set_device(args.local_rank)  # before your code runs

或者

>>> with torch.cuda.device(args.local_rank):
>>>    # your code to run
>>>    ...

从版本 2.0.0 开始: 启动程序将会把 --local-rank=<rank> 参数传递给你的脚本。从 PyTorch 2.0.0 版本开始,推荐使用破折号形式的 --local-rank,而之前使用的下划线形式 --local_rank 已不再被优先考虑。

为了保持向后兼容性,用户可能需要在参数解析代码中同时处理两种情况:即在参数解析器中同时使用--local-rank--local_rank。如果只提供了--local-rank,启动程序将触发错误:“error: 未识别的参数:–local-rank=<rank>”。对于仅支持 PyTorch 2.0.0 及以上版本的训练代码,只需包含 --local-rank 即可。

3. 在你的训练程序开始时,应调用以下函数来启动分布式后端。强烈建议使用 init_method=env://。虽然其他初始化方法(如 tcp://)可能也能工作,但 env:// 是此模块官方支持的方法。

>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>>                                      init_method='env://')

4. 在你的训练程序中,你可以选择使用常规的分布式函数或torch.nn.parallel.DistributedDataParallel() 模块。如果你的训练程序在 GPU 上运行,并且希望使用该模块,以下是配置方法。

>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>>                                                   device_ids=[args.local_rank],
>>>                                                   output_device=args.local_rank)

请确保将 device_ids 参数设置为你代码运行的唯一 GPU 设备 ID,这通常是进程的本地排名。换句话说,为了使用此工具,device_ids 需要设置为 [args.local_rank],并且 output_device 需要设置为 args.local_rank

5. 另一种方式是通过环境变量 LOCAL_RANKlocal_rank 传递给子进程。当你使用 --use-env=True 启动脚本时,会启用此行为。你需要将上述子进程示例中的 args.local_rank 替换为 os.environ['LOCAL_RANK']; 当你指定此标志时,launcher 将不会传递 --local-rank

警告

local_rank 不是全局唯一的,它在每个进程中是唯一的。因此,不要用它来决定是否写入网络文件系统。参见https://github.com/pytorch/pytorch/issues/12042 了解如果不正确处理可能会出现的问题示例。

Spawn 工具

Multiprocessing包 - torch.multiprocessing还提供了torch.multiprocessing.spawn()中的spawn函数。此辅助函数可以用于启动多个进程,它通过传入你想要运行的函数并启动N个进程来执行该函数。这也可以用于多进程分布式训练。

关于如何使用它的参考资料,请参阅 PyTorch 示例 - ImageNet 实现

请注意,此功能要求使用Python 3.4或以上版本。

调试 torch.distributed 应用程序

调试分布式应用程序可能会遇到困难,例如难以理解的挂起、崩溃或跨进程的不一致行为。为了帮助解决这些问题,torch.distributed 提供了一系列自助式工具来调试训练应用程序。

Python 断点

在分布式环境中使用 Python 调试器很方便,但由于默认情况下无法直接使用,很多人根本不使用它。PyTorch 提供了一个定制化的 pdb 包装器,使这个过程更加简便。

torch.distributed.breakpoint 让这个过程变得简单。它以两种方式自定义了 pdb 的断点行为,除此之外表现得像正常的 pdb:1. 只在一个指定的进程中附加调试器;2. 使用torch.distributed.barrier() 确保所有其他进程停止,并在被调试的进程发出continue指令后释放;3. 从子进程中重定向 stdin 连接到你的终端。

要使用它,只需在所有进程中发出torch.distributed.breakpoint(rank),并确保每次使用的rank值相同。

受监控的屏障

从 v1.10 版本开始,torch.distributed.monitored_barrier() 作为 torch.distributed.barrier() 的替代方案存在。当崩溃时,它会提供有关哪个 rank 可能有故障的有用信息,即不是所有调用 torch.distributed.monitored_barrier() 的 rank 都在提供的超时时间内完成。torch.distributed.monitored_barrier() 使用类似于确认的进程中的 send/recv 通信原语实现了一个主机端屏障,允许 rank 0 报告哪些 rank 没有及时确认屏障。例如,在以下函数中,rank 1 失败于调用 torch.distributed.monitored_barrier()(在实践中这可能是由于应用程序错误或之前的集体操作挂起):

import os
from datetime import timedelta

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    # monitored barrier requires gloo process group to perform host-side sync.
    group_gloo = dist.new_group(backend="gloo")
    if rank not in [1]:
        dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    mp.spawn(worker, nprocs=2, args=())

以下错误消息在 rank 0 上生成,允许用户确定可能存在问题的 rank 并进行进一步调查。

RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
 Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594

TORCH_DISTRIBUTED_DEBUG

当设置了环境变量 TORCH_CPP_LOG_LEVEL=INFO 时,可以使用环境变量 TORCH_DISTRIBUTED_DEBUG 来触发额外的有用日志记录和集体同步检查,以确保所有进程都适当同步。TORCH_DISTRIBUTED_DEBUG 可以设置为 OFF(默认值)、INFODETAIL,具体取决于所需的调试级别。需要注意的是,最详细的选项 DETAIL 可能会对应用程序性能产生影响,因此仅在调试问题时使用。

设置 TORCH_DISTRIBUTED_DEBUG=INFO 可以在使用 torch.nn.parallel.DistributedDataParallel() 初始化训练的模型时生成额外的调试日志。而设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 还会记录选定迭代中的运行时性能统计信息,例如前向时间、后向时间和梯度通信时间等。例如,在以下应用程序中:

import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


class TwoLinLayerNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.a = torch.nn.Linear(10, 10, bias=False)
        self.b = torch.nn.Linear(10, 1, bias=False)

    def forward(self, x):
        a = self.a(x)
        b = self.b(x)
        return (a, b)


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    print("init model")
    model = TwoLinLayerNet().cuda()
    print("init ddp")
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    inp = torch.randn(10, 10).cuda()
    print("train")

    for _ in range(20):
        output = ddp_model(inp)
        loss = output[0] + output[1]
        loss.sum().backward()


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ[
        "TORCH_DISTRIBUTED_DEBUG"
    ] = "DETAIL"  # set to DETAIL for runtime logging.
    mp.spawn(worker, nprocs=2, args=())

以下日志在初始化时显示:

I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO

以下日志在运行时显示(当设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 时):

I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 40838608
 Avg backward compute time: 5983335
Avg backward comm. time: 4326421
 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 42850427
 Avg backward compute time: 3885553
Avg backward comm. time: 2357981
 Avg backward comm/comp overlap time: 2234674

此外,TORCH_DISTRIBUTED_DEBUG=INFO 在由于模型中存在未使用参数而导致的torch.nn.parallel.DistributedDataParallel() 崩溃时,增强了崩溃日志记录。目前,如果前向传递中有可能未使用的参数,则必须在初始化时将find_unused_parameters=True 传递给torch.nn.parallel.DistributedDataParallel()。从v1.10版本开始,所有模型输出都必须在损失计算中使用,因为torch.nn.parallel.DistributedDataParallel() 不支持反向传递中的未使用参数。这些约束条件对于大型模型尤其具有挑战性,在遇到错误时崩溃时,torch.nn.parallel.DistributedDataParallel() 将记录所有未使用的参数的完全限定名称。例如,在上述应用中,如果我们修改 loss 使其计算为loss = output[1],那么在反向传递过程中TwoLinLayerNet.a 将不会接收到梯度,从而导致DDP 失败。在崩溃时,用户会获得有关未使用的参数的信息,这对于大型模型来说可能很难手动查找:

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
 the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0

设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 将在每次用户直接或间接(例如 DDP 中的 allreduce)调用集体操作时触发额外的一致性和同步检查。这通过创建一个包装器进程组来实现,该包装器封装了由torch.distributed.init_process_group()torch.distributed.new_group() API 返回的所有进程组。因此,这些 API 将返回一个包装器进程组,可以像常规进程组一样使用,并在分发集体操作到底层进程组之前执行一致性检查。目前,这些检查包括torch.distributed.monitored_barrier(),它确保所有排名完成其未决的集体调用并报告卡住的排名。接下来,通过确保所有集体函数匹配且以一致的张量形状被调用来检查集体本身的一致性。如果不满足这些条件,在应用程序崩溃时将包含详细的错误报告,而不是挂起或无信息的错误消息。例如,考虑以下函数,该函数向torch.distributed.all_reduce() 输入了不匹配的输入形状:

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    tensor = torch.randn(10 if rank == 0 else 20).cuda()
    dist.all_reduce(tensor)
    torch.cuda.synchronize(device=rank)


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
    mp.spawn(worker, nprocs=2, args=())

使用NCCL后端时,这样的应用程序可能会导致挂起,在复杂场景中很难找到根本原因。如果用户启用 TORCH_DISTRIBUTED_DEBUG=DETAIL 并重新运行应用程序,则以下错误消息会揭示问题的根本原因:

work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes:  10
[ torch.LongTensor{1} ]

注意

为了在运行时更精细地控制调试级别,可以使用以下函数:torch.distributed.set_debug_level()torch.distributed.set_debug_level_from_env()torch.distributed.get_debug_level()

此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,在检测到集体异步时记录整个调用堆栈。这些检查适用于所有使用 c10d 集体调用的应用程序,这些调用由通过torch.distributed.init_process_group()torch.distributed.new_group() API 创建的进程组支持。

日志记录

除了通过torch.distributed.monitored_barrier()TORCH_DISTRIBUTED_DEBUG 提供的显式调试支持外,torch.distributed 底层 C++ 库还在不同级别输出日志消息。这些日志消息有助于理解分布式训练作业的执行状态,并且对于诊断网络连接失败等问题非常有帮助。以下矩阵展示了如何通过组合 TORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG 环境变量来调整日志级别。

TORCH_CPP_LOG_LEVEL

TORCH_DISTRIBUTED_DEBUG

有效的日志级别

ERROR

忽略

错误

警告

忽略

警告

INFO

忽略

信息

INFO

INFO

调试

INFO

详情

跟踪(又称全部)

分布式组件会抛出从RuntimeError派生的自定义异常类型:

  • torch.distributed.DistError: 这是所有分布式异常的基类。

  • torch.distributed.DistBackendError: 当特定后端出现错误时抛出此异常。例如,如果使用了NCCL后端,并且用户尝试访问NCCL库不可用的GPU。

  • torch.distributed.DistNetworkError: 在网络库遇到错误(如连接被对端重置)时抛出此异常。

  • torch.distributed.DistStoreError: 当 Store 发生错误(例如 TCPStore 超时)时抛出此异常。

torch.distributed.DistError

在分布式库中发生错误时抛出的异常

torch.distributed.DistBackendError

在分布式系统中发生后端错误时抛出的异常

torch.distributed.DistNetworkError

在网络环境中发生分布式网络错误时抛出的异常

torch.distributed.DistStoreError

在分布式存储中发生错误时抛出的异常

如果你正在进行单节点训练,可以在脚本中方便地设置交互式断点。我们提供了一种为单一rank设置断点的便捷方法:

torch.distributed.breakpoint(rank=0, skip=0)[源代码]

在单个进程中设置断点,而其他所有进程将会在此断点完成前处于等待状态。

参数
  • rank (int) – 要打断的秩。默认值: 0

  • skip (int) – 跳过对这个断点的前 skip 次调用。默认值: 0

本页目录