会面

在Torch分布式弹性的上下文中,我们使用术语rendezvous来描述一种特定的功能,它结合了分布式同步对等发现

它被Torch分布式弹性用来聚集参与训练任务的各个节点,确保所有节点对参与者的名单和各自的角色达成一致,并共同决定何时开始或恢复训练。

Torch分布式弹性会合提供了以下关键功能:

屏障:

执行会合的节点将一直阻塞,直到达到最小数量的节点(即min个节点)加入会合屏障为止(针对同一任务)。这意味着屏障的规模可能不是固定的。

在达到min数量的节点之后,还会有短暂的等待时间,以确保所有试图在同一时间加入的节点都不会被遗漏,从而防止会合过程过快完成。

当屏障处的节点数量达到max时,会立即完成会合。

还有一个总体超时设置,如果节点数量从未达到min,则会导致会合失败。这一设置旨在作为一个简单的故障安全机制,在资源管理器出现问题的情况下帮助释放部分分配的作业资源,并且表示该操作不可重试。

排他性:

一个简单的分布式屏障是不够的,因为我们需要确保在同一时间内只有一个节点组存在(针对特定任务而言)。换句话说,新加入的节点不应该能够为同一个任务创建一个新的、独立的工作组。

Torch 分布式弹性会合确保如果一组节点已经完成了会合(即已经开始训练),则其他迟来的节点在尝试加入时只能处于等待状态,直到原有的会合被结束才能继续。

一致性:

当一个会合完成后,所有成员将达成一致,确定工作的成员资格及各自的角色。每个角色用一个介于0和world size之间的整数rank来表示。

请注意,排名是不稳定的,这意味着在下一个(重新)会合中,同一节点可能获得不同的排名。

容错:

Torch 分布式弹性机制旨在容忍会话建立过程中的节点故障。如果在此期间某个进程崩溃(或失去网络连接等),剩余的健康节点将会自动重新建立会话。

一个节点也可能在完成会合(或者被其他节点观察到已完成后)发生故障——这种情况将由Torch分布式弹性的train_loop来处理,并且还会触发重新会合。

共享键值存储:

当会话完成后,会创建并返回一个共享的键值存储。该存储实现了一个 torch.distributed.Store API(参见分布式通信文档)。

此存储仅由已完成会合的成员共享,用于Torch分布式弹性交换初始化作业控制和数据平面所需的信息。

等待的工人和会合点关闭:

Torch 分布式弹性会话处理对象提供了一些额外的功能,这些功能从技术上讲不属于会话建立过程。

  1. 查询有多少工作者在屏障处迟到,并确定他们能否参加下一次会合。

  2. 将“会合”状态设为closed,指示所有节点不参加下一次会合。

DynamicRendezvousHandler:

Torch Distributed Elastic 提供了 DynamicRendezvousHandler 类,实现了上述描述的会面机制。这是一个与后端无关的类型,在构建时需要指定一个特定的 RendezvousBackend 实例。

Torch分布式用户可以自行实现后端类型,或选择使用PyTorch自带的以下实现之一:

以下是描述rendezvous工作机制的状态图。

{BASE_RAW_UPLOAD_URL}/pytorch-doc-2.5/a4f340b7b8b23980c15963971845ffeb.png

注册表

torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, \*\*kwargs)[源代码]

保存构建RendezvousHandler所需的相关参数。

参数
  • backend (str) – 处理会合所使用的后端名称。

  • endpoint (str) – 会面的端点,通常形式为 <hostname>[:<port>]。

  • run_id (str) – 会合的标识符。

  • min_nodes (int) – 会话接受的最少节点数。

  • max_nodes (int) – 指定允许加入会合的最多节点数量。

  • local_addr (Optional[str]) – 本地节点的地址。

  • **kwargs - 为指定后端提供的额外参数。

get(key, default=None)[源代码]

如果 key 存在,则返回 key 的值,否则返回 default

返回类型

Any

get_as_bool(key, default=None)[源代码]

key 的值以 bool 类型返回。

返回类型

Optional[bool]

get_as_int(key, default=None)[源代码]

key 的值以 int 类型返回。

返回类型

Optional[int]

torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[源代码]

表示 RendezvousHandler 后端的注册表。

处理器

torch.distributed.elastic.rendezvous.RendezvousHandler[源代码]

主要对接界面。

注意

分布式 Torch 用户通常无需实现自己的 RendezvousHandler。系统已提供了基于 C10d Store 的实现,推荐大多数用户使用。

abstractget_backend()[源代码]

返回会合后台的名称。

返回类型

str

abstractget_run_id()[源代码]

返回会合的运行 ID。

运行 ID 是一个由用户定义的标识符,用于唯一标识分布式应用的一个实例。它通常对应于作业 ID,并帮助节点加入正确的分布式应用程序。

返回类型

str

abstractis_closed()[源代码]

检查会合地点是否已经关闭。

一旦达到封闭的会合点,同一任务中所有的再次会合尝试都将失败。

is_closed()set_closed() 具有最终传播的语义,不应用于同步。其意图是:如果至少有一个节点认为作业已完成,则会关闭 rendezvous,其他节点很快也会观察到这一点并停止运行。

返回类型

bool

abstractnext_rendezvous()[源代码]

会合屏障的主要入口。

一直阻塞,直到会合完成并包含当前进程于形成的工作者组中,或者出现超时,或者会合被标记为已关闭。

返回值

RendezvousInfo 的实例。

抛出异常
返回类型

RendezvousInfo

abstractnum_nodes_waiting()[源代码]

返回在会合屏障迟到的节点数量,因为这些节点没有被包含在当前的工作节点组中。

调用者应定期调用此方法,以检查是否有新的节点等待加入任务。如果有,则通过调用next_rendezvous()(重新会合)来接受这些新节点。

返回类型

int

abstractset_closed()[源代码]

将会面标记为结束。

abstractshutdown()[源代码]

关闭所有为会合开启的资源。

示例:

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
返回类型

bool

属性use_agent_store:bool

表示由next_rendezvous() 方法返回的 store 引用可以与用户应用程序共享,并在整个应用程序生命周期内保持有效。

会话处理程序实现将通过 RendezvousStoreInfo 实例共享存储详情。应用程序通常使用 MASTER_ADDRMASTER_PORT 环境变量来查找存储。

数据类

torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[源代码]

存储关于会合的信息。

torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[源代码]

存储用于初始化训练器分布式通信的地址和端口

静态build(rank, store)[源代码]

工厂方法在rank0主机上查找未使用的端口,并获取所有rank的地址和端口信息。

如果已知 master_addr 和 master_port(在共享现有 tcp 存储服务器时非常有用),请使用构造函数。

参数
  • rank (int) – 表示当前节点的等级

  • store (Store) - 用于会合的存储对象

  • local_addr (Optional[str]) – 当前节点的地址,如果没有提供将从主机名解析出来

返回类型

RendezvousStoreInfo

异常

torch.distributed.elastic.rendezvous.api.RendezvousError[源代码]

代表会合错误的基本类型。

torch.distributed.elastic.rendezvous.api.RendezvousClosedError[源代码]

在会合关闭时触发事件。

torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[源代码]

当会合未能按时完成时触发。

torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[源代码]

当连接到会话后端失败时触发。

torch.distributed.elastic.rendezvous.api.RendezvousStateError[源代码]

当会合状态受损时触发。

torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[源代码]

当节点未被纳入会合时触发,并优雅地退出。

异常是一种用于退出堆栈的机制,但这并不表示失败。

实施

动态会合

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[源代码]

使用指定的参数创建一个新的DynamicRendezvousHandler

参数
  • store (Store) - 作为rendezvous的一部分返回的C10d存储。

  • backend (RendezvousBackend) – 用于持有会话状态的后端。

返回类型

DynamicRendezvousHandler

参数

描述

加入超时时间

会合预计完成的总时间(以秒为单位)。默认值为600秒。

上次呼叫超时

在达到最少节点数量后,完成会合之前的额外等待时间(以秒为单位)。默认为30秒。

关闭超时时间

从调用 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 开始,预期会话在多少秒后关闭。默认值为30秒。

torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[源代码]

表示一个处理程序,用于在多个节点之间建立会合点。

classmethodfrom_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[源代码]

创建一个新的 DynamicRendezvousHandler

参数
  • run_id (str) – 会面的运行标识符。

  • store (Store) - 作为rendezvous的一部分返回的C10d存储。

  • backend (RendezvousBackend) – 用于持有会话状态的后端。

  • min_nodes (int) – 会话接受的最少节点数。

  • max_nodes (int) – 指定允许加入会合的最多节点数量。

  • local_addr (Optional[str]) – 本地节点的地址。

  • timeout (Optional[RendezvousTimeout]) – 会话的超时设置。

torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[源代码]

表示一个保存会合状态的后端。

abstractget_state()[源代码]

获取会合的状态。

返回值

一个包含编码的会合状态及其栅栏令牌的元组,如果没有在后端找到任何状态,则返回None

抛出异常
返回类型

Optional[Tuple[bytes, Any]]

抽象属性 名称:str

获取后台的名称。

abstractset_state(state, token=None)[源代码]

设定会合状态。

新的会合状态根据条件进行设置:

  • 如果指定的 token 与后端存储的围栏令牌匹配,则会更新状态,并将新的状态和其对应的围栏令牌返回给调用者。

  • 如果指定的 token 与后端存储的 fencing token 不匹配,状态将不会被更新;相反,会将当前的状态和其 fencing token 返回给调用者。

  • 如果指定的 tokenNone,则只有在后端没有现有状态的情况下才会设置新状态。然后将新的状态或者现有的状态及其 fencing token 返回给调用者。

参数
  • state (bytes) – 代表编码后的会合状态。

  • token (Optional[Any]) – 由先前对 get_state()set_state() 的调用检索到的可选封条令牌。

返回值

一个元组,包含序列化的会合状态、对应的围栏令牌以及一个布尔值,表示设置尝试是否成功。

抛出异常
返回类型

Optional[Tuple[bytes, Any, bool]]

torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[源代码]

保留会合的超时设置。

参数
  • join (Optional[timedelta]) – 会合预计完成的时间间隔。

  • last_call (Optional[timedelta]) – 在会合达到最少所需参与者数量后,完成会合前的额外等待时间。

  • close (Optional[timedelta]) – 在调用 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 之后,会面预计在什么时间内结束。

  • keep_alive – 在此时间内预期完成保持连接活跃的心跳。

属性 close:timedelta

获取连接关闭之前的超时时长。

属性heartbeat:timedelta

获取保持连接活跃的心跳超时时间。

property join: timedelta

获取join超时时间。

属性< span>last_call:timedelta

获取上次调用的超时时间。

C10d 后端

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[源代码]

使用指定的参数创建一个新的C10dRendezvousBackend

参数

描述

商店类型

C10d存储的类型包括“tcp”和“file”。其中,“tcp”对应torch.distributed.TCPStore,而“file”对应torch.distributed.FileStore。默认类型为“tcp”。

读取超时时间

存储操作的读取超时时间(以秒为单位),默认为60秒。

请注意,这仅适用于torch.distributed.TCPStore。对于不接受超时参数的torch.distributed.FileStore,此设置无关紧要。

是否为主办人

一个布尔值,表示此后端实例是否托管C10d存储。如果未明确指定,则通过将此机器的主机名或IP地址与指定的会合点进行匹配来推断该值。默认值为None

请注意,此配置选项仅适用于torch.distributed.TCPStore。在正常情况下可以安全忽略它;唯一需要使用它的情况是其值无法正确确定(例如,会话端点具有CNAME作为主机名或与机器的FQDN不匹配)。

返回类型

Tuple[C10dRendezvousBackend, Store]

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[源代码]

表示一个由 C10d 支持的会话后端。

参数
  • store (Store) – 用于与 C10d 存储通信的 torch.distributed.Store 实例。

  • run_id (str) – 会面的运行标识符。

get_state()[源代码]

参见基类。

返回类型

Optional[Tuple[bytes, Any]]

属性名: str

参见基类。

set_state(state, token=None)[源代码]

参见基类。

返回类型

Optional[Tuple[bytes, Any, bool]]

Etcd 后端

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[源代码]

使用指定的参数创建一个新的 EtcdRendezvousBackend

参数

描述

读取超时时间

etcd 操作的读取超时时间(以秒为单位),默认为 60 秒。

协议

用于与etcd通信的协议。有效值为“http”和“https”,默认为“http”。

SSL证书

与HTTPS一起使用的SSL客户端证书的路径。默认值为None

SSL 证书密钥

用于与HTTPS配合使用的SSL客户端证书的私钥路径。默认值为None

CA证书

指向根SSL授权证书的路径。默认值为None

返回类型

Tuple[EtcdRendezvousBackend, Store]

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[源代码]

代表基于etcd的会面后端。

参数
  • client (Client) – 用于与 etcd 进行通信的 etcd.Client 实例。

  • run_id (str) – 会面的运行标识符。

  • key_prefix (Optional[str]) – 存储 rendezvous 状态的 etcd 路径。

  • ttl (Optional[int]) – 会合状态的生存时间(TTL)。若未指定,默认为两小时。

get_state()[源代码]

参见基类。

返回类型

Optional[Tuple[bytes, Any]]

属性名: str

参见基类。

set_state(state, token=None)[源代码]

参见基类。

返回类型

Optional[Tuple[bytes, Any, bool]]

Etcd 会面( Legacy)

警告

The DynamicRendezvousHandler class replaces the EtcdRendezvousHandler class and is recommended for most users. The EtcdRendezvousHandler class is currently in maintenance mode and will be deprecated in the future.

torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[源代码]

实现一个由 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 支持的 torch.distributed.elastic.rendezvous.RendezvousHandler 接口。EtcdRendezvousHandler 使用 URL 来配置 rendezvous 类型,并传递特定于实现的配置给 rendezvous 模块。基本的 etcd rendezvous 配置 URL 如下所示:

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

-- example --

etcd://localhost:2379/1234?min_workers=1&max_workers=3

上述 URL 的含义如下:

  1. 使用与 etcd 方案注册的会合处理程序

  2. 要使用的 etcd 端点是 localhost:2379

  3. job_id == 1234 用作 etcd 中的前缀。这样可以在多个作业之间共享同一个 etcd 服务器,前提是每个作业的 job_ids 必须是唯一的。需要注意的是,作业 ID 可以是任何字符串(不一定是数字),只要它是唯一的即可。

  4. min_workers=1max_workers=3 指定了成员数量的范围。Torch 分布式弹性会在集群大小大于或等于 min_workers 时开始运行作业,并允许最多 max_workers 的节点加入集群。

以下是可传递给etcd rendezvous的所有参数的完整列表:

参数

描述

最小工作线程数

确保会话有效的最少工人数

最大工作线程数

最大允许的工人数

超时

next_rendvezvous 预期成功的总超时时间(默认 600 秒)

上次呼叫超时

达到最少工人数后的额外等待时间(“最后呼叫”)(默认为30秒)

etcd 前缀

路径前缀(从etcd根目录开始),在此范围内创建所有etcd节点(默认为/torchelastic/p2p

Etcd 存储

当使用 etcd 作为会面后端时,EtcdStore 是由 next_rendezvous() 返回的 C10d Store 实例类型。

torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[源代码]

利用rendezvous etcd实例来实现c10 Store接口。

这是由 EtcdRendezvous 返回的 store 对象。

add(key, num)[源代码]

原子地将一个值通过指定的整数增量进行增加。

整数以十进制字符串的形式表示。如果键不存在,默认值将被假设为0

返回值

新增的值

返回类型

int

check(keys)[源代码]

检查所有键是否立即可用(无需等待)。

返回类型

bool

get(key)[源代码]

通过键获取值,可能需要等待一段时间。

如果键不存在,将会进行阻塞等待,最长等待时间是 timeout,或者直到键被发布为止。

返回值

(字节)

抛出异常

LookupError - 如果在超时后键仍未发布

返回类型

字节(bytes)

set(key, value)[源代码]

将键值对写入 EtcdStore

键和值都既可以是 Python str,也可以是 bytes

wait(keys, override_timeout=None)[源代码]

等待所有密钥发布,或直到超时。

抛出异常

LookupError - 超时发生时

Etcd 服务器

EtcdServer 是一个方便的类,它使你在子进程中轻松启动和停止 etcd 服务器。这对于测试或单节点(多工作者)部署非常有用,在这些场景中手动设置 etcd 服务器会比较麻烦。

警告

在生产环境和多节点部署中,请确保部署一个高可用的etcd服务器,以避免它是分布式任务的单点故障。
torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[源代码]

注意

已在版本 3.4.3 的 etcd 服务器上通过测试。

在随机可用的端口上启动和停止一个独立的本地 etcd 服务器。这对于单节点或多工作进程启动以及测试场景非常有用,在这些情况下,使用 sidecar 式的 etcd 服务器要比单独设置 etcd 服务器更加方便。

此类注册了一个终止处理器,在退出时关闭etcd子进程。但请注意,这并不是调用stop()方法的替代方案。

以下备用机制用于查找 etcd 二进制文件:

  1. 使用 TORCHELASTIC_ETCD_BINARY_PATH 环境变量

  2. 如果存在,使用 <文件根目录>/bin/etcd

  3. 使用来自 PATH 中的 etcd

使用方法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()
参数

etcd_binary_path - etcd服务器二进制文件的路径(参见上文中的备用路径)

本页目录