分布式RPC框架

分布式RPC框架通过一系列原语支持多机模型训练,实现远程通信,并提供高级API自动处理跨多台机器分布的模型。

警告

RPC包中的API是稳定的。目前有多项工作正在进行,以提升性能和改善错误处理,这些改进将包含在未来发布的版本中。

警告

CUDA支持在PyTorch 1.9版本中引入,并且仍然是一个测试版功能。RPC包中的某些特性尚未完全兼容CUDA,因此不建议使用这些特性。不受支持的特性包括:RRefs、JIT兼容性、分布式自动微分和优化器以及性能分析。这些问题将在未来的版本中得到解决。

注意

请参阅 PyTorch 分布式概述,了解与分布式训练相关的所有功能的简要介绍。

基础知识

分布式RPC框架使远程运行函数变得更加简便,支持在不复制实际数据的情况下引用远程对象,并提供autograd和优化器API,以便在RPC边界上透明地执行反向传播和参数更新。这些功能可以归类为四组API。

  1. 远程过程调用 (RPC) 支持在指定的目标工作进程中运行带有给定参数的函数,并获取返回值或者创建对返回值的引用。主要有三个 RPC API: rpc_sync()(同步),rpc_async()(异步)和remote()(异步并返回对远程返回值的引用)。如果用户代码在没有返回值的情况下无法继续执行,则使用同步 API。否则,可以使用异步 API 获取 future,并在需要时等待 future 的结果。当要求是在远程创建某些东西但永远不需要将其获取到调用者端时,remote() API 非常有用。例如,一个驱动进程正在设置参数服务器和训练器的情况。驱动程序可以在参数服务器上创建嵌入表,并将该表的引用与训练器共享,但自己永远不会在本地使用这个嵌入表。在这种情况下,rpc_sync()rpc_async() 已不再适用,因为它们总是暗示返回值将立即或在未来返回给调用者。

  2. 远程引用(RRef) 是一个指向本地或远程对象的分布式共享指针。它可以被其他工作者共享,并且引用计数会自动处理。每个 RRef 只有一个所有者,该对象仅在该所有者的机器上存在。非所有者的工作者可以通过显式请求从所有者那里获取对象副本。这在某个工作者需要访问某些数据对象但既不是创建者(remote() 的调用者)也不是该对象的所有者时非常有用。例如,分布式优化器就是这种应用场景的一个例子。

  3. 分布式自动微分将参与前向传递的所有工作节点上的本地自动微分引擎连接起来,并在反向传递期间自动与它们通信以计算梯度。这对于进行分布式模型并行训练、参数服务器训练等特别有用,因为这些场景下前向传递需要跨越多个机器。有了这个特性,用户代码不再需要担心如何跨RPC边界发送梯度以及本地自动微分引擎的启动顺序,在前向传递中存在嵌套和相互依赖的RPC调用时,这可能会变得相当复杂。

  4. 分布式优化器 的构造函数接受一个 Optimizer()(例如,SGD(), Adagrad() 等)和一个参数 RRefs 列表。它在每个不同的 RRef 所有者上创建一个 Optimizer() 实例,并在运行 step() 时根据需要更新参数。当你拥有分布式前向和后向传递时,参数和梯度将在多个工作者之间分散,因此每个涉及的工作者都需要一个优化器。分布式优化子将所有这些本地优化器打包成一个,并提供简洁的构造函数和 step() API。

远程过程调用

在使用 RPC 和分布式自动微分原语之前,必须进行初始化。为了初始化 RPC 框架,我们需要使用 init_rpc(),它会初始化 RPC 框架、RRef 框架和分布式自动微分。

torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[源代码]

初始化RPC相关的原语,如本地RPC代理和分布式自动微分,使当前进程能够立即开始发送和接收RPC。

参数
  • name (str) – 此节点的全局唯一名称。(例如,Trainer3, ParameterServer2, Master, Worker1)名称只能包含数字、字母、下划线、冒号和/或破折号,并且长度不得超过128字符。

  • backend (BackendType, 可选) – 指定 RPC 后端实现的类型。默认值为 BackendType.TENSORPIPE。更多详细信息请参见后端

  • rank (int) – 此节点的全局唯一标识符(ID)和排名。

  • world_size (int) – 分组中工作者的数量。

  • rpc_backend_options (RpcBackendOptions, 可选) – 传递给 RpcAgent 构造函数的配置。它必须是 RpcBackendOptions 的代理特定子类,并包含代理特定的初始化设置。默认情况下,对于所有代理,默认超时时间为 60 秒,并使用环境变量 init_method = "env://" 初始化的基础进程组进行 rendezvous,这意味着需要正确设置环境变量 MASTER_ADDRMASTER_PORT。有关更多信息,请参阅后端以了解可用选项。

以下 API 允许用户远程执行函数并创建对远程数据对象的引用(RRefs)。当使用 Tensor 作为参数或返回值时,目标工作者会尝试创建具有相同元信息(如形状和步幅)的 Tensor。我们故意禁止传输 CUDA 张量,因为如果源和目标工作者的设备列表不匹配,则可能会导致程序崩溃。在这种情况下,应用程序可以在调用者处将输入张量显式地移动到 CPU,并在被调用者处根据需要将其移动到所需的设备。

警告

RPC中的TorchScript支持是一个实验性功能,可能会发生变化。从v1.5.0版本开始,torch.distributed.rpc 支持将TorchScript函数作为RPC目标进行调用,并且这有助于在被调用方提高并行性,因为执行TorchScript函数不需要GIL。

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[源代码]

向worker to 发起阻塞式RPC调用以运行函数 func。RPC消息会与Python代码的执行并行发送和接收。此方法是线程安全的。

参数
  • to (strWorkerInfoint) – 目标工作者的名称、排名或 WorkerInfo

  • func (Callable) – 一个可调用的函数,例如 Python 可调用对象、内置操作符(如 add())和注释过的 TorchScript 函数。

  • args (元组) —— func 调用的参数。

  • kwargs (dict) – 是一个包含func调用所需关键字参数的字典。

  • timeout (float, 可选) – 用于此 RPC 的超时时间(秒)。如果 RPC 在指定时间内未完成,则会引发一个表示已超时的异常。值为 0 表示无限超时,即永远不会触发超时错误。如果没有提供,默认值将在初始化时或通过 _set_rpc_timeout 方法设置。

返回值

返回 func 使用 argskwargs 参数执行的结果。

示例:

确保在两个工作者节点上正确设置了MASTER_ADDRMASTER_PORT。更多详情请参阅init_process_group() API。例如,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是一个使用 RPC 的 TorchScript 函数运行示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[源代码]

向 worker to 发起一个非阻塞的 RPC 调用以运行函数 func。RPC 消息与 Python 代码执行并行发送和接收,因此此方法是线程安全的。该方法会立即返回一个可以被等待的 Future 对象。

参数
  • to (strWorkerInfoint) – 目标工作者的名称、排名或 WorkerInfo

  • func (Callable) – 一个可调用的函数,例如 Python 可调用对象、内置操作符(如 add())和注释过的 TorchScript 函数。

  • args (元组) —— func 调用的参数。

  • kwargs (dict) – 是一个包含func调用所需关键字参数的字典。

  • timeout (float, 可选) – 用于此 RPC 的超时时间(秒)。如果 RPC 在指定时间内未完成,则会引发一个表示已超时的异常。值为 0 表示无限超时,即永远不会触发超时错误。如果没有提供,默认值将在初始化时或通过 _set_rpc_timeout 方法设置。

返回值

返回一个可以等待的Future对象。当任务完成后,可以通过该Future对象获取funcargskwargs上的返回值。

警告

由于我们不支持通过网络传输 GPU 张量,在 func 中使用 GPU 张量作为参数或返回值是不可行的。你需要先将 GPU 张量复制到 CPU 上,然后再将其用作 func 的参数或返回值。

警告

使用rpc_async API 时,在将参数张量发送到网络之前不会复制它们的存储,具体操作由不同的线程根据 RPC 后端类型来完成。调用者需要确保这些张量的内容在返回的Future 完成之前保持不变。

示例:

确保在两个工作者节点上正确设置了MASTER_ADDRMASTER_PORT。更多详情请参阅init_process_group() API。例如,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

以下是一个使用 RPC 的 TorchScript 函数运行示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[源代码]

向 worker to 发起远程调用以执行函数 func,并立即返回一个指向结果值的 RRef。worker to 将成为该 RRef 的所有者,而发起调用的 worker 则是用户。所有者负责管理其 RRef 的全局引用计数,并且只有当全局范围内没有对该 RRef 的活动引用时,才会销毁该 RRef

参数
  • to (strWorkerInfoint) – 目标工作者的名称、排名或 WorkerInfo

  • func (Callable) – 一个可调用的函数,例如 Python 可调用对象、内置操作符(如 add())和注释过的 TorchScript 函数。

  • args (元组) —— func 调用的参数。

  • kwargs (dict) – 是一个包含func调用所需关键字参数的字典。

  • timeout (float, 可选) – 该远程调用的超时时间(以秒为单位)。如果在工作者to上创建此RRef的操作在此超时时间内未成功处理,则下次尝试使用该 RRef (例如调用 to_here())时,将引发一个超时异常表示此次失败。值为0表示无限超时,即永远不会触发超时错误。如果没有提供此参数,则使用初始化时设置的默认值或通过_set_rpc_timeout 设置的默认值。

返回值

创建一个用户 RRef 实例并将其绑定到结果值上。使用阻塞 API torch.distributed.rpc.RRef.to_here() 当地获取结果值。

警告

远程(remote)API 在将张量发送到网络之前不会复制参数张量的存储,具体由 RPC 后端类型决定使用哪个线程来完成。调用者需要确保这些张量的内容在返回的 RRef 被所有者确认之前保持不变,可以使用 torch.distributed.rpc.RRef.confirmed_by_owner() API 来检查。

警告

对于remote API 的超时等错误,我们采取尽力而为的处理方式。这意味着当由 remote 发起的远程调用失败(例如由于超时)时,我们将采用一种尽力而为的方式来处理这些错误。具体来说,错误会在异步的基础上被处理并设置在结果 RRef 上。如果在此处理之前应用程序尚未使用该 RRef(如 to_here 或 fork 调用),那么后续对 RRef 的使用将会正确地抛出错误。然而,也有可能用户的应用程序会在错误被处理前就使用了 RRef。在这种情况下,由于错误尚未被处理,因此可能不会抛出任何异常。

示例:

Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
on both workers. Refer to :meth:`~torch.distributed.init_process_group`
API for more details. For example,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)

>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.get_worker_info(worker_name=None)[源代码]

获取给定工作名称的WorkerInfo。使用此WorkerInfo以避免在每次调用时传递昂贵的字符串。

参数

worker_name (str) – 工作器的字符串名称。如果为 None,则返回当前工作器的 ID。(默认值为 None)

返回值

返回给定 worker_name 对应的 WorkerInfo 实例;如果 worker_nameNone,则返回当前工作者的 WorkerInfo

torch.distributed.rpc.shutdown(graceful=True, timeout=0)[源代码]

执行 RPC 代理的关闭,然后销毁 RPC 代理。这会停止本地代理接受未处理的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果 graceful=True,将会阻塞直到所有的本地和远程 RPC 进程到达此方法并等待所有未完成的工作完成。否则,如果 graceful=False,这是一个本地关闭,并不会等待其他 RPC 进程到达此方法。

警告

对于由rpc_async() 返回的Future 对象,在调用 shutdown() 之后不应再调用 future.wait()

参数

graceful (bool) – 是否进行优雅关闭。如果为 True,将执行以下操作:1) 等待 UserRRefs 没有挂起的系统消息后再删除它们;2) 阻塞直到所有本地和远程 RPC 进程到达此方法,并等待所有未完成的工作完成。

示例:

确保在两个工作者节点上正确设置了MASTER_ADDRMASTER_PORT。更多详情请参阅init_process_group() API。例如,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

然后在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()
torch.distributed.rpc.WorkerInfo

这是一个封装系统中工作者信息的结构,包含工作者的名字和ID。此类不应直接创建,而是通过get_worker_info() 获取其实例,并将其传递给诸如rpc_sync()rpc_async()remote() 等函数,以避免每次调用时复制字符串。

属性id

用于标识工作者的全局唯一编号。

属性名称

工人的名字。

RPC包还提供了装饰器,允许应用程序指定如何在被调用方处理给定的函数。

torch.distributed.rpc.functions.async_execution(fn)[源代码]

这是一个用于函数的装饰器,表示该函数返回值保证是一个 Future 对象,并且此函数可以在 RPC 调用方异步执行。具体来说,被调用方会提取由包装函数返回的 Future 对象,并将后续处理步骤作为对该 Future 的回调进行安装。当完成时,这些回调会从 Future 中读取值并将其发送回作为 RPC 响应。这意味着返回的 Future 对象仅存在于被调用方一侧,并且不会通过 RPC 传输。此装饰器在包装函数(fn)需要由于包含例如 rpc_async() 或等待其他信号等原因而暂停和恢复时非常有用。

注意

为了启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到该装饰器安装的属性,则会知道该函数返回一个 Future 对象,并进行相应处理。然而,这并不意味着这个装饰器在定义函数时必须是最外层的一个。例如,当与 @staticmethod@classmethod 结合使用时,@rpc.functions.async_execution 需要作为内层装饰器,以确保目标函数被识别为静态或类方法。尽管如此,该目标函数仍然可以异步执行,因为在访问时,静态或类方法会保留由 @rpc.functions.async_execution 安装的属性。

示例:

返回的Future对象可以来自rpc_async()then()Future构造函数。下面的示例直接使用由then()返回的Future对象。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @rpc.functions.async_execution
>>> def async_add_chained(to, x, y, z):
>>>     # This function runs on "worker1" and returns immediately when
>>>     # the callback is installed through the `then(cb)` API. In the
>>>     # mean time, the `rpc_async` to "worker2" can run concurrently.
>>>     # When the return value of that `rpc_async` arrives at
>>>     # "worker1", "worker1" will run the lambda function accordingly
>>>     # and set the value for the previously returned `Future`, which
>>>     # will then trigger RPC to send the result back to "worker0".
>>>     return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>         lambda fut: fut.wait() + z
>>>     )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add_chained,
>>>     args=("worker2", torch.ones(2), 1, 1)
>>> )
>>> print(ret)  # prints tensor([3., 3.])

当与TorchScript装饰器结合使用时,此装饰器必须是外部最外层的。

>>> from torch import Tensor
>>> from torch.futures import Future
>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @torch.jit.script
>>> def script_add(x: Tensor, y: Tensor) -> Tensor:
>>>     return x + y
>>>
>>> @rpc.functions.async_execution
>>> @torch.jit.script
>>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]:
>>>     return rpc.rpc_async(to, script_add, (x, y))
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add,
>>>     args=("worker2", torch.ones(2), 1)
>>> )
>>> print(ret)  # prints tensor([2., 2.])

当与静态或类方法结合使用时,此装饰器必须位于内部。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> class AsyncExecutionClass:
>>>
>>>     @staticmethod
>>>     @rpc.functions.async_execution
>>>     def static_async_add(to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>>     @classmethod
>>>     @rpc.functions.async_execution
>>>     def class_async_add(cls, to, x, y, z):
>>>         ret_fut = torch.futures.Future()
>>>         rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: ret_fut.set_result(fut.wait() + z)
>>>         )
>>>         return ret_fut
>>>
>>>     @rpc.functions.async_execution
>>>     def bound_async_add(self, to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.static_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.class_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])

此装饰器也与 RRef 工具函数一起使用,例如:torch.distributed.rpc.RRef.rpc_sync()torch.distributed.rpc.RRef.rpc_async()torch.distributed.rpc.RRef.remote()

>>> from torch.distributed import rpc
>>>
>>> # reuse the AsyncExecutionClass class above
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2)
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait()
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here()
>>> print(ret)  # prints tensor([4., 4.])

后端

RPC模块可以利用不同的后端来执行节点之间的通信。要使用的后端可以在init_rpc()函数中通过传递BackendType枚举的特定值来指定。无论使用哪个后端,其余的RPC API都不会改变。每个后端还定义了自己的RpcBackendOptions类的子类,并且可以将该实例传递给init_rpc()以配置后端的行为。

torch.distributed.rpc.BackendType()

可用后端的枚举类。

PyTorch 提供了内置的 BackendType.TENSORPIPE 后端。还可以通过 register_backend() 函数来注册其他的后端。

torch.distributed.rpc.RpcBackendOptions

这是一个封装传递给RPC后端选项的抽象结构。可以通过将此类的一个实例传入init_rpc() 方法,使用特定配置(如RPC超时和要使用的init_method)来初始化RPC。

属性init-method

指定初始化进程组的 URL。默认值为 env://

属性rpc_timeout

一个浮点数,表示所有远程过程调用(RPC)的超时时间。如果RPC在此时间内未完成,则会以超时异常的形式结束。

TensorPipe 后端

TensorPipe代理,默认情况下使用TensorPipe库,该库提供了一种专为机器学习优化的原生点对点通信机制,并从根本上解决了Gloo的一些限制。与Gloo相比,它具有异步性优势,允许多个传输同时进行且互不阻塞。只有在需要时才会按需建立节点之间的连接,并且当一个节点失败时,只会关闭与其相关的连接,其他所有连接将继续正常工作。此外,它可以支持多种不同的传输方式(包括TCP、共享内存、NVLink、InfiniBand等),并能够自动检测它们的可用性并选择最佳传输方式。

TensorPipe 后端在 PyTorch v1.6 中引入,并且正在积极开发中。目前它仅支持 CPU 张量,GPU 支持即将推出。它提供了一种基于 TCP 的传输方式,类似于 Gloo。此外,它可以自动将大型张量分割并多路复用到多个套接字和线程上,以实现高带宽。代理能够自行选择最佳的传输方式,无需人工干预。

示例:

>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>>     "worker1",
>>>     rank=0,
>>>     world_size=2,
>>>     rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>>         num_worker_threads=8,
>>>         rpc_timeout=20 # 20 second timeout
>>>     )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[源代码]

TensorPipeAgent 的后端选项,继承自 RpcBackendOptions

参数
  • num_worker_threads (int, 可选) – TensorPipeAgent 使用的线程池中的线程数量(默认值:16)。

  • rpc_timeout (float, 可选) – RPC 请求的默认超时时间(单位:秒,默认为 60 秒)。如果 RPC 在此时间内未完成,则会抛出一个异常。调用者可以在必要时通过 rpc_sync()rpc_async() 为单个 RPC 请求设置不同的超时时间。

  • init_method (str, 可选) – 用于会合的分布式存储初始化的 URL。它可以接受与 init_process_group() 相同参数的有效值(默认: env://)。

  • device_maps (Dict[str, Dict], optional) – 设备映射,表示从当前工作者到被调用者的工作设备之间的关系。键是被调用者的名称,值是一个字典(Dict of int, str, 或 torch.device),用于将当前工作者的设备映射到被调用者的工作设备。(默认值: None)

  • devices (List[int, str, or torch.device>], 可选) – 由 RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将初始化为其自身的device_maps中的所有本地设备以及其对等体的device_maps中对应的设备。在处理 CUDA RPC 请求时,该代理会为这个列表中的每个设备正确同步 CUDA 流。

属性device_maps

设备定位位置。

属性设备

本地代理使用的所有设备。

属性初始化方法

指定初始化进程组的 URL。默认值为 env://

属性num_worker_threads

TensorPipeAgent 线程池中线程的数量。

属性rpc_timeout

一个浮点数,表示所有远程过程调用(RPC)的超时时间。如果RPC在此时间内未完成,则会以超时异常的形式结束。

set_device_map(to, device_map)[源代码]

在每个RPC调用方和被调用方之间设置设备映射。可以通过多次调用此函数来逐步增加设备放置配置。

参数
  • to (str) – 目标函数的名称。

  • device_map (Dict of int, str, or torch.device) – 从此工作者到被调用者之间的设备放置映射。该映射必须是可逆的。

示例

>>> # both workers
>>> def add(x, y):
>>>     print(x)  # tensor([1., 1.], device='cuda:1')
>>>     return x + y, (x + y).to(2)
>>>
>>> # on worker 0
>>> options = TensorPipeRpcBackendOptions(
>>>     num_worker_threads=8,
>>>     device_maps={"worker1": {0: 1}}
>>>     # maps worker0's cuda:0 to worker1's cuda:1
>>> )
>>> options.set_device_map("worker1", {1: 2})
>>> # maps worker0's cuda:1 to worker1's cuda:2
>>>
>>> rpc.init_rpc(
>>>     "worker0",
>>>     rank=0,
>>>     world_size=2,
>>>     backend=rpc.BackendType.TENSORPIPE,
>>>     rpc_backend_options=options
>>> )
>>>
>>> x = torch.ones(2)
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
>>> # The first argument will be moved to cuda:1 on worker1. When
>>> # sending the return value back, it will follow the invert of
>>> # the device map, and hence will be moved back to cuda:0 and
>>> # cuda:1 on worker0
>>> print(rets[0])  # tensor([2., 2.], device='cuda:0')
>>> print(rets[1])  # tensor([2., 2.], device='cuda:1')
set_devices(devices)[源代码]

设置TensorPipe RPC代理使用的本地设备。在处理CUDA RPC请求时,该代理会为列表中所有设备正确同步CUDA流。

参数

devices (List of int, str, or torch.device) – 由 TensorPipe RPC 代理使用的本地设备列表。

注意

RPC 框架不会自动重试任何 rpc_sync()rpc_async()remote() 调用。这是因为 RPC 框架无法判断一个操作是否幂等以及重试是否会带来安全问题。因此,处理失败并根据需要进行重试是应用程序的责任。由于 RPC 通信基于 TCP,网络故障或间歇性连接问题可能导致调用失败。在这种情况下,应用程序应采取合理的回退策略,并适当重试以避免因过于频繁的重试而使网络过载。

RRef

警告

在使用 CUDA 张量时,目前不支持 RRefs。

RRef(远程 REFerence)是对远程工作者上某种类型T(例如 Tensor)值的引用。此句柄保持被引用的远程值在其所有者端存活,但并不意味着该值将来会被传输到本地工作者。RRefs 可以用于多机训练,通过持有对其他工作者上存在的nn.Modules 的引用,并在训练过程中调用适当的方法来检索或修改其参数。有关详细信息,请参阅 远程引用协议

torch.distributed.rpc.PyRRef(RRef)

封装对远程工作者上某种类型值引用的类。此句柄将保持所引用的远程值在远程工作者上的存活状态。当 1) 在应用代码和本地 RRef 上下文中都没有对该 UserRRef 的引用,或者 2) 应用程序调用了优雅关闭时,UserRRef 将被删除。对已删除的 RRef 调用方法会导致未定义的行为。RRef 实现仅提供尽力而为的错误检测,并且应用程序在调用 rpc.shutdown() 后不应再使用 UserRRefs

警告

RRefs 只能通过 RPC 模块进行序列化和反序列化。在没有 RPC 的情况下,使用 Python pickle、torch save() / load()、JIT save() / load() 等方法对 RRefs 进行序列化和反序列化会导致错误。

参数
  • value (对象) – 需要由该 RRef 包围的值。

  • type_hint (Type, optional) – 作为类型提示传递给 TorchScript 编译器的 Python 类型,用于 value

示例:

以下示例为了简化,省略了 RPC 的初始化和关闭代码。具体细节请参考 RPC 文档。

  1. 通过 rpc.remote 创建 RRef

>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> # get a copy of value from the RRef
>>> x = rref.to_here()
  1. 将本地对象转换为 RRef

>>> import torch
>>> from torch.distributed.rpc import RRef
>>> x = torch.zeros(2, 2)
>>> rref = RRef(x)
  1. 与其它工作者共享RRef

>>> # On both worker0 and worker1:
>>> def f(rref):
>>>   return rref.to_here() + 1
>>> # On worker0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>> rref = RRef(torch.zeros(2, 2))
>>> # the following RPC shares the rref with worker1, reference
>>> # count is automatically updated.
>>> rpc.rpc_sync("worker1", f, args=(rref,))
backward(self:torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id=-1, retain_graph=False) None

使用 RRef 作为根节点来运行反向传播过程。如果提供了 dist_autograd_ctx_id,则从 RRef 的所有者开始执行分布式反向传递,并使用提供的 ctx_id。在这种情况下,应通过调用get_gradients() 来获取梯度值。如果 dist_autograd_ctx_idNone,则认为这是一个本地的 autograd 图,并且仅执行本地反向传递。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。RRef 的值应为标量张量。

参数
  • dist_autograd_ctx_id (int, 可选) – 用于检索梯度的分布式自动微分上下文 ID,默认值为 -1。

  • retain_graph (bool, optional) – 如果为 False,用于计算梯度的图将被释放。注意,在几乎所有情况下设置此选项为 True 是不必要的,并且通常可以以更有效的方式解决该问题。通常,你需要将其设置为 True 以便多次运行 backward(默认值:False)。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     rref.backward(context_id)
confirmed_by_owner(self:torch._C._distributed_rpc.PyRRef) bool

返回此 RRef 是否已由所有者确认。对于 OwnerRRef,总是返回 true;而对于 UserRRef,只有在所有者知道该 UserRRef 时才返回 true。

is_owner(self:torch._C._distributed_rpc.PyRRef) bool

返回当前节点是否是该 RRef 的所有者。

local_value(self:torch._C._distributed_rpc.PyRRef) object

如果当前节点是所有者,则返回本地值的引用;否则,抛出异常。

owner(self:torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo

返回与此 RRef 所属节点相关的工作者信息。

owner_name(self:torch._C._distributed_rpc.PyRRef) str

返回拥有此 RRef 的节点的工人名称。

remote(self:torch._C._distributed_rpc.PyRRef, timeout=-1.0) object

创建一个辅助代理,以便轻松启动remote调用,并使用RRef的所有者作为目标,在该RRef引用的对象上运行函数。更具体地说,rref.remote().func_name(*args, **kwargs)等同于以下内容:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeout (float, 可选) – rref.remote() 的超时时间。如果在指定时间内 RRef 创建未成功完成,则下次尝试使用该 RRef(如 to_here)时将引发超时异常。如果没有提供此参数,默认的 RPC 超时时间将会被使用。请参阅 rpc.remote() 以了解有关 RRef 的具体超时规则。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.remote().size().to_here()  # returns torch.Size([2, 2])
>>> rref.remote().view(1, 4).to_here()  # returns tensor([[1., 1., 1., 1.]])
rpc_async(self:torch._C._distributed_rpc.PyRRef, timeout:float=-1.0) object

创建一个辅助代理,以便轻松使用 RRef 的所有者作为目标来启动 rpc_async 调用,并在该 RRef 引用的对象上运行函数。更具体地说,rref.rpc_async().func_name(*args, **kwargs) 等同于以下内容:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeout (float, 可选) – rref.rpc_async() 的超时时间。如果在此时间段内调用未完成,则会抛出一个异常。如果没有提供此参数,则使用默认的 RPC 超时时间。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_async().size().wait()  # returns torch.Size([2, 2])
>>> rref.rpc_async().view(1, 4).wait()  # returns tensor([[1., 1., 1., 1.]])
rpc_sync(self:torch._C._distributed_rpc.PyRRef, timeout:float=-1.0) object

创建一个辅助代理,以便轻松启动rpc_sync,使用RRef的所有者作为目标来运行此RRef引用的对象上的函数。更具体地说,rref.rpc_sync().func_name(*args, **kwargs) 与以下内容相同:

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeout (float, 可选) – rref.rpc_sync() 的超时时间。如果在此时间段内调用未完成,则会抛出一个异常。如果没有提供此参数,则使用默认的 RPC 超时时间。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_sync().size()  # returns torch.Size([2, 2])
>>> rref.rpc_sync().view(1, 4)  # returns tensor([[1., 1., 1., 1.]])
to_here(self:torch._C._distributed_rpc.PyRRef, timeout=-1.0) object

阻塞调用,将 RRef 的值从所有者节点复制到本地节点并返回。如果当前节点是所有者节点,则返回本地值的引用。

参数

timeout (float, 可选) – to_here 的超时时间。如果调用在此时间段内未完成,则会抛出一个异常。如果没有提供此参数,则使用默认的 RPC 超时时间(60秒)。

RemoteModule

警告

当前使用 CUDA 张量时不支持 RemoteModule

RemoteModule 是在不同进程中创建一个 nn.Module 的简便方法。实际模块位于远程主机上,但本地主机可以像调用常规的 nn.Module 一样通过句柄来调用它。然而,这种调用会触发对远程端的 RPC 调用,并且可以通过 RemoteModule 提供的额外 API 实现异步执行。

torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[源代码]

只有在完成 RPC 初始化之后,才能创建 RemoteModule 实例。

它在一个指定的远程节点上创建一个用户定义的模块,并像常规的 nn.Module 一样工作,除了 forward 方法在远程节点执行。它还负责自动梯度记录,以确保反向传播将梯度传递回相应的远程模块。

它根据 module_clsforward 方法签名生成两个方法:forward_asyncforward。其中,forward_async 异步运行并返回一个 Future 对象。forward_asyncforward 的参数与由 module_cls 返回的模块中的 forward 方法相同。

例如,如果module_cls 返回一个nn.Linear 实例,并且该实例具有如下的 forward 方法签名:def forward(input: Tensor) -> Tensor:,那么生成的RemoteModule 将包含以下两个方法:

def forward(input: Tensor) -> Tensor:
```python def forward_async(input: Tensor) -> Future[Tensor]: ```
参数
  • remote_device (str) – 目标工作者上的设备名称,用于指定放置模块的位置。格式为“<workername>/<device>”,其中设备字段可以解析为 torch.device 类型。例如,“trainer0/cpu”表示在名为 trainer0 的工作者的 CPU 上放置模块;“ps0/cuda:0” 表示在名为 ps0 的工作者的第一个 GPU 上放置模块。如果省略设备字段,默认值为 “cpu”。

  • module_cls (nn.Module) –

    用于远程创建模块的类。例如:

    >>> class MyModule(nn.Module):
    >>>     def forward(input):
    >>>         return input + 1
    >>>
    >>> module_cls = MyModule
    
  • args (Sequence, optional) – 将传递给 module_cls 的参数。

  • kwargs (Dict, optional) – 传递给 module_cls 的可选关键字参数。

返回值

这是一个远程模块实例,它封装了由用户提供的 module_cls 创建的 Module。它包含一个阻塞的 forward 方法和一个异步的 forward_async 方法,后者返回在远程侧用户提供的模块上的 forward 调用的 future 对象。

示例:

在两个不同的进程中运行以下代码:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>>     "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

此外,结合DistributedDataParallel (DDP) 的一个更实用的例子可以在该 教程 中找到。

获取模块引用()

返回一个指向远程模块的 RRef(即 RRef[nn.Module])。

返回类型

RRef[模块]

remote_parameters(recurse=True)

返回一个包含指向远程模块参数的 RRef 列表。

这通常可以与 DistributedOptimizer 结合使用。

参数

recurse (bool) – 如果为 True,则返回远程模块及其所有子模块的参数。否则,仅返回远程模块直接成员的参数。

返回值

一个包含远程模块参数的 RRef 列表 (List[RRef[nn.Parameter]])。

返回类型

[ ]

分布式自动求导框架

警告

当前使用 CUDA 张量时不支持分布式自动微分

此模块提供了一个基于 RPC 的分布式自动微分框架,适用于模型并行训练等应用。简而言之,应用程序可以通过 RPC 发送和接收梯度记录张量。在前向传递过程中,我们记录了通过 RPC 发送的梯度记录张量,在反向传递时使用这些信息通过 RPC 执行分布式反向传递。更多详细信息请参见分布式自动微分设计

torch.distributed.autograd.backward(context_id:int, roots:List[Tensor], retain_graph=False) None

使用提供的根节点启动分布式反向传递。当前实现的是快速模式算法,该算法假设在同一分布式自动微分上下文中发送的所有 RPC 消息都会在反向传播过程中成为自动微分图的一部分。

我们利用提供的根节点来识别自动求导图,并计算相应的依赖关系。该方法会一直阻塞,直到整个自动求导过程完成。

我们在每个节点上适当的torch.distributed.autograd.context中累积梯度。当调用torch.distributed.autograd.backward()时,会根据传入的context_id查找对应的autograd上下文。如果没有找到与给定ID匹配的有效autograd上下文,则抛出错误。你可以使用get_gradients() API来获取累积的梯度。

参数
  • context_id (int) – 自动微分的上下文标识符,用于检索梯度。

  • roots (列表) – 表示自动微分计算起点的张量。所有的张量都应该为标量。

  • retain_graph (bool, 可选) – 如果为 False,用于计算 grad 的图将被释放。注意,在绝大多数情况下,设置此选项为 True 是不必要的,并且通常可以采用更有效的方式解决该问题。通常,你需要将其设置为 True 才能多次运行 backward。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     pred = model.forward()
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
torch.distributed.autograd.context[源代码]

用于在使用分布式自动微分时封装前向和后向传递的上下文对象。在 with 语句中生成的 context_id 是唯一标识所有工作节点上的分布式反向传递所必需的。每个工作节点存储与此 context_id 相关的元数据,这是正确执行分布式自动微分传递所必需的。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>>     dist_autograd.backward(context_id, [loss])
torch.distributed.autograd.get_gradients(context_id:int) Dict[Tensor,Tensor]

根据分布式自动微分反向传递中的context_id,检索一个从张量到其相应梯度的映射,该梯度累积在给定的上下文中。

参数

context_id (int) – 自动微分的上下文标识符,用于检索梯度。

返回值

一个映射表,其中键是张量,值是对应的梯度。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = t1 + t2
>>>     dist_autograd.backward(context_id, [loss.sum()])
>>>     grads = dist_autograd.get_gradients(context_id)
>>>     print(grads[t1])
>>>     print(grads[t2])

分布式优化器

请参阅torch.distributed.optim 页面,了解分布式优化器的相关文档。

设计笔记

分布式自动微分设计说明介绍了基于RPC的分布式自动微分框架的设计,这对于模型并行训练等应用非常有用。

The RRef设计笔记介绍了RRef(远程引用)协议的设计,该协议用于框架通过远程工作者来引用值。

教程

RPC 教程向用户介绍了 RPC 框架,提供了几个使用 torch.distributed.rpc API 的示例应用程序,并演示了如何使用 分析器 来分析基于 RPC 的工作负载。

本页目录