会面
在Torch分布式弹性的上下文中,我们使用术语rendezvous来描述一种特定的功能,它结合了分布式同步和对等发现。
它被Torch分布式弹性用来聚集参与训练任务的各个节点,确保所有节点对参与者的名单和各自的角色达成一致,并共同决定何时开始或恢复训练。
Torch分布式弹性会合提供了以下关键功能:
屏障:
执行会合的节点将一直阻塞,直到达到最小数量的节点(即min
个节点)加入会合屏障为止(针对同一任务)。这意味着屏障的规模可能不是固定的。
在达到min
数量的节点之后,还会有短暂的等待时间,以确保所有试图在同一时间加入的节点都不会被遗漏,从而防止会合过程过快完成。
当屏障处的节点数量达到max
时,会立即完成会合。
还有一个总体超时设置,如果节点数量从未达到min
,则会导致会合失败。这一设置旨在作为一个简单的故障安全机制,在资源管理器出现问题的情况下帮助释放部分分配的作业资源,并且表示该操作不可重试。
排他性:
一个简单的分布式屏障是不够的,因为我们需要确保在同一时间内只有一个节点组存在(针对特定任务而言)。换句话说,新加入的节点不应该能够为同一个任务创建一个新的、独立的工作组。
Torch 分布式弹性会合确保如果一组节点已经完成了会合(即已经开始训练),则其他迟来的节点在尝试加入时只能处于等待状态,直到原有的会合被结束才能继续。
一致性:
当一个会合完成后,所有成员将达成一致,确定工作的成员资格及各自的角色。每个角色用一个介于0和world size之间的整数rank来表示。
请注意,排名是不稳定的,这意味着在下一个(重新)会合中,同一节点可能获得不同的排名。
容错:
Torch 分布式弹性机制旨在容忍会话建立过程中的节点故障。如果在此期间某个进程崩溃(或失去网络连接等),剩余的健康节点将会自动重新建立会话。
一个节点也可能在完成会合(或者被其他节点观察到已完成后)发生故障——这种情况将由Torch分布式弹性的train_loop
来处理,并且还会触发重新会合。
共享键值存储:
当会话完成后,会创建并返回一个共享的键值存储。该存储实现了一个 torch.distributed.Store
API(参见分布式通信文档)。
此存储仅由已完成会合的成员共享,用于Torch分布式弹性交换初始化作业控制和数据平面所需的信息。
等待的工人和会合点关闭:
Torch 分布式弹性会话处理对象提供了一些额外的功能,这些功能从技术上讲不属于会话建立过程。
-
查询有多少工作者在屏障处迟到,并确定他们能否参加下一次会合。
-
将“会合”状态设为closed,指示所有节点不参加下一次会合。
DynamicRendezvousHandler:
Torch Distributed Elastic 提供了 DynamicRendezvousHandler
类,实现了上述描述的会面机制。这是一个与后端无关的类型,在构建时需要指定一个特定的 RendezvousBackend
实例。
Torch分布式用户可以自行实现后端类型,或选择使用PyTorch自带的以下实现之一:
-
C10dRendezvousBackend
: 使用 C10d 存储(默认为TCPStore
)作为会合后端。使用 C10d 存储的主要优势在于它不需要任何第三方依赖(如 etcd)来建立会合。 -
EtcdRendezvousBackend
: 取代了传统的EtcdRendezvousHandler
类。将EtcdRendezvousBackend
实例传递给DynamicRendezvousHandler
在功能上等同于创建一个EtcdRendezvousHandler
实例。store = TCPStore("localhost") backend = C10dRendezvousBackend(store, "my_run_id") rdzv_handler = DynamicRendezvousHandler.from_backend( run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4 )
以下是描述rendezvous工作机制的状态图。

注册表
- 类torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, \*\*kwargs)[源代码]
-
保存构建
RendezvousHandler
所需的相关参数。- 参数
- 类torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[源代码]
-
表示
RendezvousHandler
后端的注册表。
处理器
- 类torch.distributed.elastic.rendezvous.RendezvousHandler[源代码]
-
主要对接界面。
注意
分布式 Torch 用户通常无需实现自己的
RendezvousHandler
。系统已提供了基于 C10d Store 的实现,推荐大多数用户使用。- abstractget_run_id()[源代码]
-
返回会合的运行 ID。
运行 ID 是一个由用户定义的标识符,用于唯一标识分布式应用的一个实例。它通常对应于作业 ID,并帮助节点加入正确的分布式应用程序。
- 返回类型
- abstractis_closed()[源代码]
-
检查会合地点是否已经关闭。
一旦达到封闭的会合点,同一任务中所有的再次会合尝试都将失败。
is_closed()
和set_closed()
具有最终传播的语义,不应用于同步。其意图是:如果至少有一个节点认为作业已完成,则会关闭 rendezvous,其他节点很快也会观察到这一点并停止运行。- 返回类型
- abstractnext_rendezvous()[源代码]
-
会合屏障的主要入口。
一直阻塞,直到会合完成并包含当前进程于形成的工作者组中,或者出现超时,或者会合被标记为已关闭。
- 返回值
-
RendezvousInfo
的实例。 - 抛出异常
-
-
RendezvousClosedError – 建立连接的会话已经关闭。
-
RendezvousConnectionError — 连接至rendezvous后端失败。
-
RendezvousStateError – 会话状态损坏。
-
RendezvousTimeoutError — 会话超时,未能按时完成。
-
- 返回类型
- abstractnum_nodes_waiting()[源代码]
-
返回在会合屏障迟到的节点数量,因为这些节点没有被包含在当前的工作节点组中。
调用者应定期调用此方法,以检查是否有新的节点等待加入任务。如果有,则通过调用
next_rendezvous()
(重新会合)来接受这些新节点。- 返回类型
- abstractset_closed()[源代码]
-
将会面标记为结束。
- abstractshutdown()[源代码]
-
关闭所有为会合开启的资源。
示例:
rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown()
- 返回类型
- 属性use_agent_store:bool
-
表示由
next_rendezvous()
方法返回的 store 引用可以与用户应用程序共享,并在整个应用程序生命周期内保持有效。会话处理程序实现将通过
RendezvousStoreInfo
实例共享存储详情。应用程序通常使用 MASTER_ADDR 和 MASTER_PORT 环境变量来查找存储。
数据类
- 类torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[源代码]
-
存储关于会合的信息。
异常
- 类torch.distributed.elastic.rendezvous.api.RendezvousError[源代码]
-
代表会合错误的基本类型。
- 类torch.distributed.elastic.rendezvous.api.RendezvousClosedError[源代码]
-
在会合关闭时触发事件。
- 类torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[源代码]
-
当会合未能按时完成时触发。
- 类torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[源代码]
-
当连接到会话后端失败时触发。
- 类torch.distributed.elastic.rendezvous.api.RendezvousStateError[源代码]
-
当会合状态受损时触发。
- 类torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[源代码]
-
当节点未被纳入会合时触发,并优雅地退出。
异常是一种用于退出堆栈的机制,但这并不表示失败。
实施
动态会合
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[源代码]
-
使用指定的参数创建一个新的
DynamicRendezvousHandler
。- 参数
-
-
store (Store) - 作为rendezvous的一部分返回的C10d存储。
-
backend (RendezvousBackend) – 用于持有会话状态的后端。
-
- 返回类型
参数
描述
加入超时时间
会合预计完成的总时间(以秒为单位)。默认值为600秒。
上次呼叫超时
在达到最少节点数量后,完成会合之前的额外等待时间(以秒为单位)。默认为30秒。
关闭超时时间
从调用
RendezvousHandler.set_closed()
或RendezvousHandler.shutdown()
开始,预期会话在多少秒后关闭。默认值为30秒。
- 类torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[源代码]
-
表示一个处理程序,用于在多个节点之间建立会合点。
- classmethodfrom_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[源代码]
-
创建一个新的
DynamicRendezvousHandler
。- 参数
-
-
run_id (str) – 会面的运行标识符。
-
store (Store) - 作为rendezvous的一部分返回的C10d存储。
-
backend (RendezvousBackend) – 用于持有会话状态的后端。
-
min_nodes (int) – 会话接受的最少节点数。
-
max_nodes (int) – 指定允许加入会合的最多节点数量。
-
timeout (Optional[RendezvousTimeout]) – 会话的超时设置。
-
- 类torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[源代码]
-
表示一个保存会合状态的后端。
- abstractget_state()[源代码]
-
获取会合的状态。
- 返回值
-
一个包含编码的会合状态及其栅栏令牌的元组,如果没有在后端找到任何状态,则返回
None
。 - 抛出异常
-
-
RendezvousConnectionError — 与后端的连接失败。
-
RendezvousStateError – 会话状态损坏。
-
- 返回类型
- 抽象属性 名称:str
-
获取后台的名称。
- abstractset_state(state, token=None)[源代码]
-
设定会合状态。
新的会合状态根据条件进行设置:
-
如果指定的
token
与后端存储的围栏令牌匹配,则会更新状态,并将新的状态和其对应的围栏令牌返回给调用者。 -
如果指定的
token
与后端存储的 fencing token 不匹配,状态将不会被更新;相反,会将当前的状态和其 fencing token 返回给调用者。 -
如果指定的
token
为None
,则只有在后端没有现有状态的情况下才会设置新状态。然后将新的状态或者现有的状态及其 fencing token 返回给调用者。
- 参数
- 返回值
-
一个元组,包含序列化的会合状态、对应的围栏令牌以及一个布尔值,表示设置尝试是否成功。
- 抛出异常
-
-
RendezvousConnectionError — 与后端的连接失败。
-
RendezvousStateError – 会话状态损坏。
-
- 返回类型
-
- 类torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[源代码]
-
保留会合的超时设置。
- 参数
- 属性 close:timedelta
-
获取连接关闭之前的超时时长。
- 属性heartbeat:timedelta
-
获取保持连接活跃的心跳超时时间。
- property join: timedelta
-
获取join超时时间。
- 属性< span>last_call:timedelta
-
获取上次调用的超时时间。
C10d 后端
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[源代码]
-
使用指定的参数创建一个新的
C10dRendezvousBackend
。参数
描述
商店类型
C10d存储的类型包括“tcp”和“file”。其中,“tcp”对应
torch.distributed.TCPStore
,而“file”对应torch.distributed.FileStore
。默认类型为“tcp”。读取超时时间
存储操作的读取超时时间(以秒为单位),默认为60秒。
请注意,这仅适用于
torch.distributed.TCPStore
。对于不接受超时参数的torch.distributed.FileStore
,此设置无关紧要。是否为主办人
一个布尔值,表示此后端实例是否托管C10d存储。如果未明确指定,则通过将此机器的主机名或IP地址与指定的会合点进行匹配来推断该值。默认值为
None
。请注意,此配置选项仅适用于
torch.distributed.TCPStore
。在正常情况下可以安全忽略它;唯一需要使用它的情况是其值无法正确确定(例如,会话端点具有CNAME作为主机名或与机器的FQDN不匹配)。- 返回类型
Etcd 后端
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[源代码]
-
使用指定的参数创建一个新的
EtcdRendezvousBackend
。参数
描述
读取超时时间
etcd 操作的读取超时时间(以秒为单位),默认为 60 秒。
协议
用于与etcd通信的协议。有效值为“http”和“https”,默认为“http”。
SSL证书
与HTTPS一起使用的SSL客户端证书的路径。默认值为
None
。SSL 证书密钥
用于与HTTPS配合使用的SSL客户端证书的私钥路径。默认值为
None
。CA证书
指向根SSL授权证书的路径。默认值为
None
。- 返回类型
Etcd 会面( Legacy)
警告
The DynamicRendezvousHandler
class replaces the EtcdRendezvousHandler
class and is recommended for most users. The EtcdRendezvousHandler
class is currently in maintenance mode and will be deprecated in the future.
- 类torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[源代码]
-
实现一个由
torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous
支持的torch.distributed.elastic.rendezvous.RendezvousHandler
接口。EtcdRendezvousHandler
使用 URL 来配置 rendezvous 类型,并传递特定于实现的配置给 rendezvous 模块。基本的 etcd rendezvous 配置 URL 如下所示:etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605 -- example -- etcd://localhost:2379/1234?min_workers=1&max_workers=3
上述 URL 的含义如下:
-
使用与
etcd
方案注册的会合处理程序 -
要使用的
etcd
端点是localhost:2379
-
job_id == 1234
用作 etcd 中的前缀。这样可以在多个作业之间共享同一个 etcd 服务器,前提是每个作业的job_ids
必须是唯一的。需要注意的是,作业 ID 可以是任何字符串(不一定是数字),只要它是唯一的即可。 -
min_workers=1
和max_workers=3
指定了成员数量的范围。Torch 分布式弹性会在集群大小大于或等于min_workers
时开始运行作业,并允许最多max_workers
的节点加入集群。
以下是可传递给etcd rendezvous的所有参数的完整列表:
参数
描述
最小工作线程数
确保会话有效的最少工人数
最大工作线程数
最大允许的工人数
超时
next_rendvezvous 预期成功的总超时时间(默认 600 秒)
上次呼叫超时
达到最少工人数后的额外等待时间(“最后呼叫”)(默认为30秒)
etcd 前缀
路径前缀(从etcd根目录开始),在此范围内创建所有etcd节点(默认为
/torchelastic/p2p
) -
Etcd 存储
当使用 etcd 作为会面后端时,EtcdStore
是由 next_rendezvous()
返回的 C10d Store
实例类型。
- 类torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[源代码]
-
利用rendezvous etcd实例来实现c10 Store接口。
这是由
EtcdRendezvous
返回的 store 对象。- get(key)[源代码]
-
通过键获取值,可能需要等待一段时间。
如果键不存在,将会进行阻塞等待,最长等待时间是
timeout
,或者直到键被发布为止。- 返回值
-
值
(字节)
- 抛出异常
-
LookupError - 如果在超时后键仍未发布 –
- 返回类型
- set(key, value)[源代码]
-
将键值对写入
EtcdStore
。键和值都既可以是 Python
str
,也可以是bytes
。
- wait(keys, override_timeout=None)[源代码]
-
等待所有密钥发布,或直到超时。
- 抛出异常
-
LookupError - 超时发生时
Etcd 服务器
EtcdServer
是一个方便的类,它使你在子进程中轻松启动和停止 etcd 服务器。这对于测试或单节点(多工作者)部署非常有用,在这些场景中手动设置 etcd 服务器会比较麻烦。
警告
- 类torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[源代码]
-
注意
已在版本 3.4.3 的 etcd 服务器上通过测试。
在随机可用的端口上启动和停止一个独立的本地 etcd 服务器。这对于单节点或多工作进程启动以及测试场景非常有用,在这些情况下,使用 sidecar 式的 etcd 服务器要比单独设置 etcd 服务器更加方便。
此类注册了一个终止处理器,在退出时关闭etcd子进程。但请注意,这并不是调用
stop()
方法的替代方案。以下备用机制用于查找 etcd 二进制文件:
-
使用 TORCHELASTIC_ETCD_BINARY_PATH 环境变量
-
如果存在,使用
<文件根目录>/bin/etcd
-
使用来自
PATH
中的etcd
使用方法
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
- 参数
-
etcd_binary_path - etcd服务器二进制文件的路径(参见上文中的备用路径)
-