远程引用协议

本文档详细描述了远程引用协议的设计,并展示了不同场景下的消息流程。请在继续阅读前确保你已经熟悉了分布式RPC框架

背景

RRef 代表远程 REFerence。它是指位于本地或远程工作者上的对象引用,并且在内部透明地处理引用计数。概念上,它可以被视为分布式共享指针。应用程序可以通过调用 remote() 创建 RRef。每个 RRef 由 remote() 调用的被调用者工作者(即所有者)拥有,并且可以被多个用户使用。所有者存储实际数据并跟踪全局引用计数。每个 RRef 可以通过一个全局 RRefId 唯一标识,该 ID 在创建 remote() 调用时分配。

在拥有者工作者中,只有一个包含实际数据的OwnerRRef实例。而在用户工作者上,可以创建任意数量的UserRRefs实例,并且这些UserRRef不持有任何数据。所有在拥有者上的使用都会通过全局唯一的RRefId来获取该唯一的OwnerRRef实例。当作为rpc_sync()rpc_async()remote()调用中的参数或返回值使用时,会创建一个UserRRef实例,并且拥有者将根据更新引用计数的通知。当全局没有UserRRef实例并且在拥有者上也没有对OwnerRRef的任何引用时,该OwnerRRef及其数据将会被删除。

假设

RRef协议的设计基于以下假设。

  • 临时网络故障:RRef 设计通过重试消息来处理临时网络故障。它无法处理节点崩溃或永久性网络分区。当这些情况发生时,应用程序应关闭所有工作者,回滚到之前的检查点,并继续训练。

  • 非幂等 UDF: 我们假设提供给 rpc_sync()rpc_async() 或者 remote() 的用户定义函数(UDF)是非幂等的,因此不能重试。然而,内部 RRef 控制消息是幂等的,并且在消息失败时会自动重试。

  • 消息乱序交付:由于发送方和接收方都使用了多个线程,我们不假设任何一对节点之间的消息交付顺序。因此,无法保证哪条消息会先被处理。

RRef 生命周期

该协议的目标是在适当的时间删除一个OwnerRRef。删除OwnerRRef的最佳时机是当没有存活的UserRRef实例,并且用户代码也没有引用OwnerRRef时。难点在于如何判断是否存在任何存活的UserRRef实例。

设计理由

用户可以在三种不同情况下获取 UserRRef

  1. 接收来自所有者的UserRRef

  2. 接收来自另一用户的UserRRef

  3. 创建一个新的由另一个工作者拥有的 UserRRef

案例1是最简单的场景:拥有者将其 RRef 传递给用户,并通过调用rpc_sync()rpc_async()remote(),并将 RRef 作为参数传递。在这种情况下,用户端将创建一个新的UserRRef对象。由于拥有者是调用方,它可以轻松地更新其本地的 OwnerRRef 引用计数。

唯一的要求是,在销毁时,任何 UserRRef 都必须通知其拥有者。因此,我们需要第一个保证:

G1. 当任何用户引用(UserRRef)被删除时,所有者将会收到通知。

由于消息可能会延迟或乱序到达,我们需要额外的保障来确保删除消息不会被过早处理。如果A向B发送一条涉及RRef的消息,我们会在A(父RRef)上调用该RRef,并在B(子RRef)上调用相应的RRef。

G2. 在子级 RRef 获得所有者确认之前,父级 RRef 不会被删除。

在情况 2 和 3 中,所有者可能只对 RRef 分叉图有部分了解或完全不了解。例如,在用户上可以创建一个 RRef,并且在所有者接收到任何 RPC 调用之前,创建该 RRef 的用户可能会将其与其他人共享,而这些人又可以进一步分享该 RRef。一个不变量是,任何 RRef 的分叉图始终是一个树形结构,因为分叉一个 RRef 总是在调用方(除非调用方是所有者)上创建一个新的 UserRRef 实例,并且因此每个 RRef 只有一个父节点。

树中每个 UserRRef 的所有者视角包含三个阶段:

1) unknown -> 2) known -> 3) deleted.

所有者对整个树的视图会不断变化。当所有者认为没有存活的 UserRRef 实例时,它会删除其 OwnerRRef 实例。也就是说,在删除 OwnerRRef 时,所有的 UserRRef 实例可能已经被确实删除或未知。危险的情况是当某些分支的状态未知而其他分支已被删除。

G2 明确保证了在所有子 UserRRef 实例被其所有者知晓之前,没有任何父 UserRRef 被删除。然而,可能存在子 UserRRef 在其所有者知道其父 UserRRef 之前就被删除的情况。

考虑以下示例:从 OwnerRRef 开始分叉到 A,A 再分叉到 Y,最后 Y 分叉到 Z。

OwnerRRef -> A -> Y -> Z

如果 Z 的所有消息(包括删除消息)都在接收者处理 Y 的消息之前由所有者处理,那么所有者会在知道 Y 存在之前得知 Z 已经被删除。然而,这并不会造成任何问题。因为至少有一个 Y 的祖先 A 仍然存在,并且它会阻止所有者删除 OwnerRRef。更具体地说,如果所有者不知道 Y,则根据G2规则,A 不能被删除,并且由于所有者是 A 的父节点,所以知道 A。

如果 RRef 在用户端创建,情况会变得稍微复杂一些:

OwnerRRef
    ^
    |
    A -> Y -> Z

如果 Z 调用 to_here() 方法在 UserRRef 上,那么所有者会在 Z 被删除时知道 A 的存在,否则 to_here() 不会完成。 如果 Z 没有调用 to_here() 方法,则有可能所有者在收到任何来自 A 和 Y 的消息之前已经收到了 Z 发送的所有消息。 在这种情况下,由于 OwnerRRef 的实际数据尚未创建,因此没有需要删除的内容。 这与 Z 完全不存在的情况相同。 因此,这种情况仍然可以接受。

实现

G1 通过在 UserRRef 析构函数中发送删除消息来实现。为了提供 G2,每当父 UserRRef 被 fork 时,它会被放入一个由新 ForkId 索引的上下文中。只有当父 UserRRef 收到子节点发送的确认消息(ACK)后,才会从上下文中移除;而子节点仅在收到所有者确认后才会发出 ACK。

协议场景

现在让我们讨论上述设计在四种情况下是如何体现为协议的。

用户与拥有者通过 RRef 进行共享

import torch
import torch.distributed.rpc as rpc

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()

在这种情况下,在用户工作者 A 上创建 UserRRef,然后将其与远程消息一起传递给拥有者工作者 B。B 收到后会创建 OwnerRRef。方法 remote() 立即返回,这意味着在拥有者知道之前,UserRRef 可以被 fork 或使用。

在拥有者端,当接收到remote()调用时,会创建一个OwnerRRef并返回一个确认信息(ACK)来确认{100, 1}RRefId, ForkId)。只有在接收到这个确认信息之后,A才能删除其UserRRef。这涉及G1G2两个部分。G1是显而易见的。对于G2OwnerRRefUserRRef的孩子,并且在接收到拥有者的确认信息之前,UserRRef不会被删除。

user_to_owner_ret.png

上图展示了消息流程:实线箭头表示用户函数,虚线箭头表示内置消息。需要注意的是,从 A 到 B 的前两条消息(remote()to_here())可以以任意顺序到达 B,但最终的删除消息仅在满足特定条件时发送。

  • B 认可 UserRRef {100, 1} (G2),并且

  • UserRRef 不再在作用域内并可被垃圾回收时,Python GC 会同意删除该本地实例。

用户以拥有者为参数共享RRef

import torch
import torch.distributed.rpc as rpc

# on worker A and worker B
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))

在这种情况下,创建UserRRef后,A会将它作为参数传递给后续对B的RPC调用。A会保持UserRRef {100, 1}存活直到收到B的确认(G2),而不是等待RPC调用的返回值。这样做是因为A需要在接收到所有先前消息之前不会发送删除消息,否则OwnerRRef可能会在使用前被删除,因为我们不能保证消息传递顺序。通过创建一个作为RRef子项的ForkId并将其保留在映射中直到收到所有者确认该ForkId为止来实现这一点。下图显示了消息流程。

user_to_owner_arg.png

注意,在 B 上,UserRRef 可能在 func 完成之前甚至开始之前就被删除了。然而这是可以接受的,因为在 B 发送子 ForkId 的 ACK 时,它已经获取了 OwnerRRef 实例,这会防止其过早被删除。

将所有权的RRef共享给用户

所有者到用户是最简单的情形,此时所有者可以本地更新引用计数,并且不需要发送额外的控制消息通知其他人。对于G2而言,它等同于父节点立即从所有者那里收到ACK的情况,因为在这个场景中父节点就是所有者。

import torch
import torch.distributed.rpc as RRef, rpc

# on worker B and worker C
def func(rref):
  pass

# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上图展示了消息流。需要注意的是,在 rpc_async 调用之后,当 OwnerRRef 退出作用域时,并不会被删除。这是因为内部有一个映射来保持其存活状态,如果有已知的分支存在的话,这种情况下的 UserRRef 就是 {100, 1}。 (G2)

用户之间共享RRef

这是最复杂的情形,涉及调用者用户(父 UserRRef)、被调用者用户(子 UserRRef)以及所有者三者的共同参与。

import torch
import torch.distributed.rpc as rpc

# on worker A and worker C
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
user_to_user.png

当 C 从 A 接收到 UserRRef 子对象时,它会向所有者 B 发送一个 fork 请求。稍后,当 B 在 C 上确认了该 UserRRef,C 将并行执行两个操作:1)发送子对象 ACK 给 A;2)运行用户提供的函数。在此期间,父对象(A)将保持其 UserRRef {100, 1} 活跃以实现G2

本页目录