使用 PyTorch 编写分布式应用程序
作者: Séb Arnold
前提条件:
在这个简短的教程中,我们将介绍 PyTorch 的分布式包。我们将学习如何设置分布式环境,使用不同的通信策略,并深入了解该包的一些内部机制。
设置
PyTorch 中包含的分布式包(即 torch.distributed
)使研究人员和开发者能够轻松地在多个进程和机器集群之间并行化计算。为此,它利用了消息传递语义,允许每个进程与其他任何进程进行数据通信。与多进程包(torch.multiprocessing
)不同,这些进程可以使用不同的通信后端,并且不局限于在同一台机器上执行。
为了入门,我们需要能够同时运行多个进程。如果您可以访问计算集群,应该向本地系统管理员咨询或使用您喜欢的协调工具(例如 pdsh、clustershell 或 slurm)。在本教程中,我们将使用单台机器,并通过以下模板生成多个进程。
"""run.py:"""
#!/usr/bin/env python
importos
importsys
importtorch
importtorch.distributedasdist
importtorch.multiprocessingasmp
defrun(rank, size):
""" Distributed function to be implemented later. """
pass
definit_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
world_size = 2
processes = []
if "google.colab" in sys.modules:
print("Running in Google Colab")
mp.get_context("spawn")
else:
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
上述脚本会生成两个进程,每个进程都会设置分布式环境,初始化进程组(dist.init_process_group
),并最终执行给定的 run
函数。
让我们来看看 init_process
函数。它确保每个进程都能通过一个主节点进行协调,使用相同的 IP 地址和端口。请注意,我们使用了 gloo
后端,但其他后端也是可用的。(参见 第5.1节)我们将在本教程的末尾详细讲解 dist.init_process_group
中的魔法,但它的本质是允许进程通过共享它们的位置来相互通信。
点对点通信
从一个进程向另一个进程传输数据的过程称为点对点通信。这是通过 send
和 recv
函数或其 即时 对应函数 isend
和 irecv
来实现的。
"""Blocking point-to-point communication."""
defrun(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上面的例子中,两个进程都从一个零张量开始,然后进程 0 对张量进行递增并将其发送给进程 1,因此它们最终都会得到 1.0。需要注意的是,进程 1 需要分配内存以存储它将接收的数据。
另外需要注意的是,send/recv
是阻塞的:两个进程都会阻塞,直到通信完成。而立即操作是非阻塞的;脚本会继续执行,这些方法会返回一个 Work
对象,我们可以选择对其调用 wait()
。
"""Non-blocking point-to-point communication."""
defrun(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
在使用立即数时,我们必须谨慎处理发送和接收的张量。由于我们无法确定数据何时会被传输到其他进程,因此在 req.wait()
完成之前,我们不应修改发送的张量,也不应访问接收的张量。换句话说,
-
在
dist.isend()
之后对tensor
进行写入操作将导致未定义行为。 -
在
dist.irecv()
之后对tensor
进行读取操作将导致未定义行为,直到req.wait()
被执行。
然而,在执行 req.wait()
之后,我们可以确信通信已经发生,并且存储在 tensor[0]
中的值为 1.0。
点对点通信在我们希望对进程间的通信进行更精细控制时非常有用。它们可以用来实现复杂的算法,例如 Baidu’s DeepSpeech 或 Facebook’s large-scale experiments 中使用的算法。(参见 Section 4.1)
集体通信
与点对点通信不同,集合通信允许在组中的所有进程之间进行通信模式。一个组是我们所有进程的一个子集。要创建一个组,我们可以将一组 rank 传递给 dist.new_group(group)
。默认情况下,集合通信在所有进程中执行,这也被称为全局通信。例如,为了获取所有进程中所有张量的总和,我们可以使用 dist.all_reduce(tensor, op, group)
集合通信。
""" All-Reduce example."""
defrun(rank, size):
""" Simple collective communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由于我们需要计算组内所有张量的总和,因此使用 dist.ReduceOp.SUM
作为归约操作符。一般来说,任何满足交换律的数学运算都可以用作操作符。PyTorch 默认提供了许多此类操作符,它们均在元素级别上工作:
-
dist.ReduceOp.SUM
, -
dist.ReduceOp.PRODUCT
, -
dist.ReduceOp.MAX
, -
dist.ReduceOp.MIN
, -
dist.ReduceOp.BAND
, -
dist.ReduceOp.BOR
, -
dist.ReduceOp.BXOR
, -
dist.ReduceOp.PREMUL_SUM
.
支持的完整操作符列表请参见此处。
除了 dist.all_reduce(tensor, op, group)
之外,PyTorch 目前还实现了许多其他集合操作。以下是一些支持的集合操作。
-
dist.broadcast(tensor, src, group)
:将tensor
从src
复制到所有其他进程。 -
dist.reduce(tensor, dst, op, group)
:对每个tensor
应用op
,并将结果存储在dst
中。 -
dist.all_reduce(tensor, op, group)
:与 reduce 相同,但结果存储在所有进程中。 -
dist.scatter(tensor, scatter_list, src, group)
:将scatter_list[i]
中的第 \(i^{\text{th}}\) 个张量复制到第 \(i^{\text{th}}\) 个进程。 -
dist.gather(tensor, gather_list, dst, group)
:从所有进程中复制tensor
到dst
。 -
dist.all_gather(tensor_list, tensor, group)
:从所有进程中复制tensor
到tensor_list
,并在所有进程中存储。 -
dist.barrier(group)
:阻塞组中的所有进程,直到每个进程都进入此函数。 -
dist.all_to_all(output_tensor_list, input_tensor_list, group)
:将输入张量列表分散到组中的所有进程,并在输出列表中返回收集的张量列表。
支持的集合操作完整列表可以通过查看 PyTorch Distributed 的最新文档 (链接) 找到。
分布式训练
注意: 您可以在 GitHub 仓库 中找到本节的示例脚本。
在理解了分布式模块的工作原理后,让我们用它编写一些实用的代码。我们的目标是复制 DistributedDataParallel 的功能。当然,这将是一个教学示例,在实际应用中,您应该使用上面链接的官方、经过充分测试和优化的版本。
简单来说,我们想要实现一个分布式版本的随机梯度下降。我们的脚本将让所有进程在其数据批次上计算模型的梯度,然后对梯度进行平均。为了确保在更改进程数量时获得相似的收敛结果,我们首先需要对数据集进行分区。(您也可以使用 torch.utils.data.random_split,而不是下面的代码片段。)
""" Dataset partitioning helper """
classPartition(object):
def__init__(self, data, index):
self.data = data
self.index = index
def__len__(self):
return len(self.index)
def__getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
classDataPartitioner(object):
def__init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random() # from random import Random
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
defuse(self, partition):
return Partition(self.data, self.partitions[partition])
通过上述代码片段,我们现在可以轻松地使用以下几行代码对任何数据集进行分区:
""" Partitioning MNIST """
defpartition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 // size
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假设我们有两个副本,那么每个进程将拥有 60000 / 2 = 30000 个样本的 train_set
。为了保持 整体 的批量大小为 128,我们还将批量大小除以副本的数量。
现在,我们可以编写常见的前向-后向-优化训练代码,并添加一个函数调用来对模型的梯度进行平均。(以下内容主要参考了官方的 PyTorch MNIST 示例。)
""" Distributed Synchronous SGD Example """
defrun(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
接下来需要实现 average_gradients(model)
函数,该函数接收一个模型并对其梯度在全球范围内进行平均。
""" Gradient averaging. """
defaverage_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
就是这样!我们成功实现了分布式同步 SGD,并可以在大型计算机集群上训练任何模型。
**注意:**虽然最后一句话在技术上是正确的,但要实现生产级别的同步 SGD,还需要更多的技巧。再次强调,请使用经过测试和优化的方案。
我们自己的环形Allreduce
作为一个额外的挑战,假设我们希望实现 DeepSpeech 的高效环形 allreduce。使用点对点集合通信可以相对容易地实现这一点。
""" Implementation of a ring-reduce with addition. """
defallreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = send.clone()
recv_buff = send.clone()
accum = send.clone()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv_buff[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send_buff[:]
send_req.wait()
recv[:] = accum[:]
在上述脚本中,allreduce(send, recv)
函数的签名与 PyTorch 中的略有不同。它接收一个 recv
张量,并将所有 send
张量的总和存储在其中。作为留给读者的练习,我们的版本与 DeepSpeech 中的版本还有一个区别:他们的实现将梯度张量划分为 块,以最优地利用通信带宽。(提示:torch.chunk)
高级主题
现在我们准备探索 torch.distributed
的一些更高级功能。由于内容较多,本节分为两个子部分:
-
通信后端:了解如何使用 MPI 和 Gloo 进行 GPU 间的通信。
-
初始化方法:理解如何在
dist.init_process_group()
中最佳地设置初始协调阶段。
通信后端
torch.distributed
最优雅的方面之一是其能够抽象并构建在不同后端之上。正如之前提到的,PyTorch 中实现了多个后端。其中一些最流行的包括 Gloo、NCCL 和 MPI。它们各自有不同的规格和权衡,具体取决于预期的使用场景。支持功能的对比表可以在这里找到。
Gloo 后端
到目前为止,我们已经广泛使用了Gloo 后端。作为一个开发平台,它非常方便,因为它包含在预编译的 PyTorch 二进制文件中,并且在 Linux(自 0.2 版本起)和 macOS(自 1.3 版本起)上都可以运行。它支持 CPU 上的所有点对点和集体操作,以及 GPU 上的所有集体操作。对于 CUDA 张量的集体操作实现,没有 NCCL 后端提供的优化程度高。
您肯定已经注意到,如果我们将 model
放在 GPU 上,我们的分布式 SGD 示例将无法正常工作。为了使用多个 GPU,我们还需要进行以下修改:
-
使用
device = torch.device("cuda:{}".format(rank))
-
model = Net()
\(\rightarrow\)model = Net().to(device)
-
使用
data, target = data.to(device), target.to(device)
通过上述修改,我们的模型现在可以在两个 GPU 上进行训练,您可以使用 watch nvidia-smi
来监控它们的利用率。
MPI 后端
消息传递接口(MPI)是高性能计算领域中的标准化工具。它支持点对点和集体通信,并且是 torch.distributed
API 的主要灵感来源。MPI 有多种实现(例如 Open-MPI、MVAPICH2、Intel MPI),每种实现都针对不同的用途进行了优化。使用 MPI 后端的优势在于其在大规模计算机集群中的广泛可用性和高度优化。一些 最近的 实现 还能够利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存拷贝。
遗憾的是,PyTorch 的二进制文件无法包含 MPI 实现,因此我们必须手动重新编译。幸运的是,这个过程相当简单,因为 PyTorch 在编译时会自动查找可用的 MPI 实现。以下步骤通过从源码安装 PyTorch 来安装 MPI 后端。
-
创建并激活您的 Anaconda 环境,按照指南安装所有先决条件,但暂时不要运行
python setup.py install
。 -
选择并安装您喜欢的 MPI 实现。请注意,启用 CUDA-aware MPI 可能需要一些额外的步骤。在我们的示例中,我们将使用 不支持 GPU 的 Open-MPI:
conda install -c conda-forge openmpi
。 -
现在,进入您克隆的 PyTorch 仓库并执行
python setup.py install
。
为了测试我们新安装的后端,需要进行一些修改。
-
将
if __name__ == '__main__':
下的内容替换为init_process(0, 0, run, backend='mpi')
。 -
运行
mpirun -n 4 python myscript.py
。
这些更改的原因是MPI需要在生成进程之前创建自己的环境。MPI还会生成自己的进程,并执行初始化方法中描述的握手操作,这使得init_process_group
中的rank
和size
参数变得多余。这实际上非常强大,因为您可以通过向mpirun
传递额外的参数来为每个进程定制计算资源。(例如每个进程的核心数、手动为特定rank分配机器,以及更多内容)这样做,您应该会得到与其他通信后端相同的熟悉输出。
NCCL 后端
NCCL 后端 提供了针对 CUDA 张量的集体操作优化实现。如果您仅使用 CUDA 张量进行集体操作,建议使用此后端以获得最佳性能。NCCL 后端已包含在支持 CUDA 的预构建二进制文件中。
初始化方法
在本教程的结尾,让我们回顾一下最初调用的函数:dist.init_process_group(backend, init_method)
。具体来说,我们将讨论负责在每个进程之间进行初步协调的各种初始化方法。这些方法使您能够定义这种协调是如何完成的。
初始化方法的选择取决于您的硬件设置,某些方法可能比其他方法更合适。除了以下部分,请参阅官方文档以获取更多信息。
环境变量
在本教程中,我们一直在使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程将能够正确连接到主节点,获取有关其他进程的信息,并最终与它们进行握手。
-
MASTER_PORT
: 主机上用于运行 rank 0 进程的空闲端口。 -
MASTER_ADDR
: 用于运行 rank 0 进程的主机的 IP 地址。 -
WORLD_SIZE
: 进程的总数,以便主进程知道需要等待多少个工作进程。 -
RANK
: 每个进程的 rank,以便它们知道自己是主进程还是工作进程。
共享文件系统
共享文件系统要求所有进程都能访问同一个共享文件系统,并通过共享文件进行协调。这意味着每个进程都会打开该文件,写入其信息,并等待所有其他进程完成相同操作。之后,所有所需信息将立即对所有进程可用。为了避免竞争条件,文件系统必须支持通过 fcntl 进行锁定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP
通过 TCP 进行初始化可以通过提供 rank 为 0 的进程的 IP 地址和可访问的端口号来实现。在这里,所有工作节点将能够连接到 rank 为 0 的进程,并交换有关如何相互访问的信息。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
我要感谢 PyTorch 开发者在实现、文档和测试方面所做的出色工作。当代码不清晰时,我总是可以依赖 文档 或 测试 来找到答案。特别要感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein,他们在早期草稿中提供了深刻的评论并回答了问题。