torchrun(弹性启动)
扩展了 torch.distributed.launch
的功能。
torchrun
提供了比 torch.distributed.launch
更丰富的功能,包括以下额外功能:
-
当工人节点失败时,会通过重启所有工作节点来优雅地处理。
-
Worker 的
RANK
和WORLD_SIZE
会自动被分配。 -
节点的数量可以在最小值和最大值之间灵活调整(弹性)。
注意
torchrun
是一个 Python 控制台脚本,指向主模块 torch.distributed.run。该模块在 setup.py 的 entry_points
配置中声明。调用 torchrun
等同于执行 python -m torch.distributed.run
。
从torch.distributed.launch切换到torchrun
torchrun
支持与 torch.distributed.launch
相同的参数,除了 已被弃用的 --use-env
。要从 torch.distributed.launch
迁移到 torchrun
,请按照以下步骤操作:
-
如果你的训练脚本已经从
LOCAL_RANK
环境变量读取了local_rank
,那么你可以省略--use-env
标志,例如:torch.distributed.launch
torchrun
$ python-mtorch.distributed.launch--use-envtrain_script.py
$ torchruntrain_script.py
-
如果你的训练脚本从
--local-rank
命令行参数读取本地排名,请将其修改为从LOCAL_RANK
环境变量中读取,如以下代码片段所示:torch.distributed.launch
torchrun
import argparse parser = argparse.ArgumentParser() parser.add_argument("--local-rank", type=int) args = parser.parse_args() local_rank = args.local_rank
import os local_rank = int(os.environ["LOCAL_RANK"])
从版本 2.0.0 开始: 启动程序将会把 --local-rank=<rank>
参数传递给你的脚本。从 PyTorch 2.0.0 版本开始,推荐使用破折号形式的 --local-rank
,而之前使用的下划线形式 --local_rank
已不再被优先考虑。
为了保持向后兼容性,用户可能需要在参数解析代码中同时处理两种情况:即在参数解析器中同时使用--local-rank
和 --local_rank
。如果只提供了--local-rank
,启动程序将触发错误:“error: 未识别的参数:–local-rank=<rank>”。对于仅支持 PyTorch 2.0.0 及以上版本的训练代码,只需包含 --local-rank
即可。
>>> import argparse >>> parser = argparse.ArgumentParser() >>> parser.add_argument("--local-rank", "--local_rank", type=int) >>> args = parser.parse_args()
上述更改足以从 torch.distributed.launch
迁移到 torchrun
。要利用 torchrun
的新功能(例如弹性、容错和错误报告),请参阅:
-
训练脚本,了解更多关于如何编写符合
torchrun
规范的训练脚本的信息。 -
继续查看本页以了解更多关于
torchrun
的功能。
使用方法
单节点多工作进程
torchrun --standalone --nnodes=1 --nproc-per-node=$NUM_TRAINERS YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
单节点多工作者堆叠
要在同一主机上运行多个单节点多工实例(单独的任务),我们需要确保每个实例设置在不同的端口上以避免冲突。为此,请使用 --rdzv-backend=c10d
并通过设置 --rdzv-endpoint=localhost:$PORT_k
来指定不同端口。对于 --nodes=1
,通常让 torchrun
自动选择一个空闲的随机端口会更方便,而不是手动为每个运行分配不同的端口。
torchrun --rdzv-backend=c10d --rdzv-endpoint=localhost:0 --nnodes=1 --nproc-per-node=$NUM_TRAINERS YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
容错(固定工作进程数量,不具备弹性,可容忍三次失败)
torchrun --nnodes=$NUM_NODES --nproc-per-node=$NUM_TRAINERS --max-restarts=3 --rdzv-id=$JOB_ID --rdzv-backend=c10d --rdzv-endpoint=$HOST_NODE_ADDR YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
HOST_NODE_ADDR
的格式为 <host>[:<port>](例如 node1.example.com:29400),用于指定 C10d 会合后端实例化和托管的节点及端口。它可以是训练集群中的任意节点,但最好选择一个带宽较高的节点。
注意
如果没有指定端口号,HOST_NODE_ADDR
将默认为 29400。
弹性(min=1
,max=4
,能容忍最多3次成员变更或故障)
torchrun --nnodes=1:4 --nproc-per-node=$NUM_TRAINERS --max-restarts=3 --rdzv-id=$JOB_ID --rdzv-backend=c10d --rdzv-endpoint=$HOST_NODE_ADDR YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
HOST_NODE_ADDR
的格式为 <host>[:<port>](例如 node1.example.com:29400),用于指定 C10d 会合后端实例化和托管的节点及端口。它可以是训练集群中的任意节点,但最好选择一个带宽较高的节点。
注意
如果没有指定端口号,HOST_NODE_ADDR
将默认为 29400。
关于rendezvous后台的注释
-
--rdzv-id
: 唯一的作业ID(由参与该作业的所有节点共享) -
--rdzv-backend
: 实现torch.distributed.elastic.rendezvous.RendezvousHandler
接口 -
--rdzv-endpoint
: 会晤后端所在的终端点,通常格式为host:port
。
目前,默认支持的 rendezvous 后端包括 c10d
(推荐)、etcd-v2
和 etcd
(旧版)。若要使用 etcd-v2
或 etcd
,需要设置一个启用了 v2
API 的 etcd 服务器(例如:--enable-v2
)。
警告
etcd-v2
和 etcd
的会话使用 etcd API v2。你必须在 etcd 服务器上启用 v2 API。我们的测试使用的是 etcd 版本 3.4.3。
警告
对于基于 etcd 的会合,我们推荐使用 etcd-v2
而不是 etcd
。这两个在功能上是等价的,但 etcd-v2
使用了修订后的实现。而 etcd
处于维护模式,并将在未来的版本中被移除。
定义
-
Node
- 物理实例或容器;对应于作业管理器处理的基本单位。 -
Worker
- 分布式训练中的一种工作节点。 -
WorkerGroup
- 执行相同功能的工作者集合,例如训练器。 -
LocalWorkerGroup
- 工作组中在同一节点上运行的工人的一个子集。 -
RANK
- 工作者在工作组中的排名。 -
WORLD_SIZE
- 工作组中的总 worker 数量。 -
LOCAL_RANK
- 本地工作组中工作者的编号。 -
LOCAL_WORLD_SIZE
- 本地工作组的大小。 -
rdzv_id
- 用户定义的标识,用于唯一标识作业的工作组。每个节点使用该标识加入特定工作组。
-
rdzv_backend
- 用于会合的后端(例如c10d
),通常是一个强一致性的键值存储系统。 -
rdzv_endpoint
- 会面后端的端点,通常格式为<主机>:<端口>
。
一个Node
运行LOCAL_WORLD_SIZE
个工作者,这些工作者组成一个LocalWorkerGroup
。作业中所有节点的LocalWorkerGroups
的并集构成了WorkerGroup
。
环境变量
以下环境变量可供你在脚本中使用:
-
LOCAL_RANK
- 当前设备的本地排名。 -
RANK
- 全球排名。 -
GROUP_RANK
- 表示工作组的排名,范围是从 0 到max_nnodes
的一个数字。如果每个节点只运行一个工作组,则该值表示节点的排名。 -
ROLE_RANK
- 表示工作者在其相同角色的其他工作者中的排名。工作者的具体角色可以在WorkerSpec
中找到。 -
LOCAL_WORLD_SIZE
- 本地运行的工作者数量(例如,本地的世界大小);等于在torchrun
中指定的--nproc-per-node
参数。 -
WORLD_SIZE
- 作业中总的工作进程数。 -
ROLE_WORLD_SIZE
- 指定具有相同角色的工作进程中总的数量,该角色是在WorkerSpec
中定义的。 -
MASTER_ADDR
- 运行 rank 为 0 的 worker 的主机的完全限定域名(FQDN),用于初始化 Torch 分布式后端。 -
MASTER_PORT
- 在MASTER_ADDR
上用于托管 C10d TCP 存储的端口。 -
TORCHELASTIC_RESTART_COUNT
- 工作组迄今为止的重启次数。 -
TORCHELASTIC_MAX_RESTARTS
- 最大允许的重启次数(配置值)。 -
TORCHELASTIC_RUN_ID
- 等于会话的run_id
(例如,唯一的工作ID)。 -
PYTHON_EXEC
- 系统可执行文件的覆盖选项。如果提供,python 用户脚本将使用PYTHON_EXEC
的值作为可执行文件。默认情况下使用 sys.executable。
部署
-
(可选,仅适用于非C10d后端)启动会话后台服务器,并获取其终端地址,以便将其作为
--rdzv-endpoint
参数传递给启动脚本。 -
单节点多工作者:在主机上启动启动程序,以启动负责创建和监控本地工作者组的代理进程。
-
多节点多工作者:在所有参与训练的节点上使用相同参数启动启动程序。
当使用作业或集群管理器时,多节点作业的入口点命令应该是这个启动程序。
故障模式
-
Worker 失败:对于一个拥有
n
个工人的训练任务,如果k<=n
个工人失败,则所有工人将被停止并重新启动,最多可以重启max_restarts
次。 -
代理故障:代理故障会导致本地工作小组失败。此时,作业管理器可以选择让整个作业失效(集群语义),或者尝试替换故障节点。代理支持这两种处理方式。
-
节点故障:与代理故障一样。
会员变动
-
节点离开(缩放缩小):代理会收到即将有节点离开的通知,所有现有的工作者将停止运行。然后会形成一个新的
WorkerGroup
,并且所有的工作者将以新的RANK
和WORLD_SIZE
重新启动。 -
节点到达(扩展):新节点被加入作业中,所有现有的工作者停止运行,然后形成一个新的
WorkerGroup
,所有的工作者将以新的RANK
和WORLD_SIZE
重新启动。
重要通知
-
此工具和多进程分布式的 GPU 训练(无论是单节点还是多节点),目前只能通过使用 NCCL 分布式后端来实现最佳性能。因此,建议在进行 GPU 训练时使用 NCCL 后端。
-
此模块提供了初始化Torch进程组所需的环境变量,无需手动设置
RANK
。要在训练脚本中初始化一个进程组,只需执行以下操作:
>>> import torch.distributed as dist >>> dist.init_process_group(backend="gloo|nccl")
-
在你的训练程序中,你可以选择使用常规的分布式函数或
torch.nn.parallel.DistributedDataParallel()
模块。如果你的训练程序使用 GPU 并希望采用torch.nn.parallel.DistributedDataParallel()
模块,可以参考以下配置方法。
local_rank = int(os.environ["LOCAL_RANK"]) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
请确保将 device_ids
参数设置为你的代码将要操作的唯一 GPU 设备 ID,这通常是进程的本地排名。换句话说,device_ids
应该是 [int(os.environ("LOCAL_RANK"))]
,而 output_device
需要是 int(os.environ("LOCAL_RANK"))
才能使用此工具。
-
在失败或成员变更时,所有存活的工作进程将立即被终止。请确保定期保存进度检查点。检查点的频率应根据您的任务对工作丢失的容忍程度来确定。
-
此模块仅支持相同的
LOCAL_WORLD_SIZE
。也就是说,假设所有节点在每个角色上运行相同数量的本地工作者。 -
RANK
是不稳定的。在重启时,节点上的本地工作者可能会被分配不同的排名范围。永远不要硬编码关于排名稳定性的假设,也不要假设RANK
和LOCAL_RANK
之间存在某种关联。 -
当使用弹性设置(
min_size != max_size
)时,不要硬编码有关WORLD_SIZE
的假设,因为节点的加入和离开会导致世界大小发生变化。 -
建议你的脚本采用如下结构:
def main(): load_checkpoint(checkpoint_path) initialize() train() def train(): for batch in iter(dataset): train_step(batch) if should_checkpoint: save_checkpoint(checkpoint_path)
-
(推荐) 当出现工作节点错误时,此工具会总结错误详情(如时间、排名、主机、进程ID和跟踪回溯等)。在每个节点上,按时间戳排序的第一个错误会被视为“根本原因”错误。为了在错误摘要中包含跟踪回溯信息,请参考下面的例子,在训练脚本的主要入口函数添加装饰器。如果没有添加装饰器,则摘要仅会显示退出码而不会包含异常的跟踪回溯信息。有关 torchelastic 错误处理的详细信息,请参阅:https://pytorch.org/docs/stable/elastic/errors.html
from torch.distributed.elastic.multiprocessing.errors import record @record def main(): # do train pass if __name__ == "__main__": main()