多进程最佳做法

torch.multiprocessing 是 Python 的 multiprocessing 模块的一个替代品。它支持完全相同的操作,但进行了扩展:所有通过 multiprocessing.Queue 发送的张量数据会被移动到共享内存中,并且只会将句柄发送给另一个进程。

注意

当一个 Tensor 被发送到另一个进程时,Tensor 的数据会被共享。如果 torch.Tensor.grad 不为 None,它也会被共享。当一个没有 torch.Tensor.grad 字段的 Tensor 被发送到另一个进程后,会创建一个特定于该进程的标准 .grad Tensor。这个 .grad 不像 Tensor 的数据那样会在所有进程中自动共享。

这可以实现各种训练方法,例如 Hogwild、A3C 或其他需要异步操作的方法。

多进程中的CUDA应用

CUDA 运行时不支持 fork 启动方法;要在子进程中使用 CUDA,必须使用 spawnforkserver 方法。

注意

可以使用 multiprocessing.get_context(...) 创建上下文,或者直接使用 multiprocessing.set_start_method(...) 来设置启动方法。

与CPU张量不同,发送进程需要在接收进程保留张量副本期间保持原始张量不变。虽然这在底层已经实现,但用户仍需遵循最佳实践以确保程序正确运行。例如,只要消费者进程引用了该张量,发送进程就必须保持活动状态,并且如果消费者进程通过致命信号异常退出,则引用计数无法解决问题。参见此部分

另请参阅:使用 nn.parallel.DistributedDataParallel 而不是 multiprocessing 或 nn.DataParallel

最佳实践与技巧

避免和应对死锁

在创建新进程时,可能会遇到许多问题,最常见的死锁原因是后台线程。如果有任何线程持有锁或导入模块,并且调用了 fork,那么子进程很可能会处于损坏状态并导致死锁或其他失败情况。需要注意的是,即使你自己没有这样做,Python 的内置库也会这样做——例如查看multiprocessing即可。multiprocessing.Queue实际上是一个非常复杂的类,它会启动多个线程用于序列化、发送和接收对象,并且这些线程也可能导致上述问题。如果你发现自己处于这种情况,请尝试使用不使用任何额外线程的 SimpleQueue

我们尽力让你的使用更便捷,并防止死锁的发生,但有些事情不在我们的掌控之中。如果你遇到了暂时无法解决的问题,可以在论坛上寻求帮助,我们会看看是否能解决这个问题。

重复使用通过队列传递的缓冲区

请记住,每次将一个Tensor放入到multiprocessing.Queue中时,都需要将其移动到共享内存。如果它已经是共享的,则不会执行任何操作;否则会增加额外的内存复制,这可能会减慢整个进程的速度。即使你有一个进程池将数据发送给单个进程,也要让它返回缓冲区——这是几乎免费的操作,并且可以让你在发送下一个批次时避免复制。

异步多进程训练(如 Hogwild)

使用torch.multiprocessing,可以异步训练模型。参数可以在整个过程中共享,或者定期同步。在第一种情况下,我们建议传递整个模型对象;而在第二种情况下,我们建议只发送state_dict()

我们推荐使用multiprocessing.Queue在进程间传递各种PyTorch对象。例如,当使用fork启动方法时,可以继承已经在共享内存中的张量和存储,但这非常容易出错,并且应该谨慎使用,仅限高级用户使用。尽管队列有时不是最优雅的解决方案,但在所有情况下都能正常工作。

警告

你应该避免编写未用if __name__ == '__main__'保护的全局语句。如果使用了不同于fork的启动方法,这些语句将会在所有子进程中执行。

Hogwild

一个具体的Hogwild实现可以在示例仓库中找到。此外,为了展示代码的整体结构,这里还提供了一个简化的示例:

import torch.multiprocessing as mp
from model import MyModel

def train(model):
    # Construct data_loader, optimizer, etc.
    for data, labels in data_loader:
        optimizer.zero_grad()
        loss_fn(model(data), labels).backward()
        optimizer.step()  # This will update the shared parameters

if __name__ == '__main__':
    num_processes = 4
    model = MyModel()
    # NOTE: this is required for the ``fork`` method to work
    model.share_memory()
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=train, args=(model,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

多进程中的 CPU

不恰当的多进程处理会导致 CPU 资源过度分配,使不同进程竞争有限的 CPU 资源,从而降低效率。

本教程将解释什么是CPU超订以及如何避免它。

CPU 超分配

CPU 超分配是指分配给系统的虚拟 CPU(vCPU)总数超过硬件上实际可用的 vCPU 总数的一种技术现象。

这会导致严重的CPU资源竞争。在这种情况下,进程之间的频繁切换会增加进程切换的开销,并降低整个系统的效率。

查看 示例仓库 中 Hogwild 实现的代码示例,了解 CPU 超分配的情形。

当使用以下命令在 CPU 上用 4 个进程运行训练示例时:

pythonmain.py--num-processes4

假设机器上有 N 个虚拟中央处理器(vCPU),执行上述命令将生成 4 个子进程。每个子进程都会为自己分配 N 个 vCPU,总共需要 4*N 个 vCPU。然而,机器上只有 N 个 vCPU 可用。因此,不同的进程会竞争资源,并导致频繁的进程切换。

以下观察表明存在CPU过度分配的问题:

  1. 高CPU利用率:通过使用htop命令,你可以观察到CPU一直保持在较高水平,经常达到或超过其最大容量。这表明对CPU资源的需求超过了可用的物理核心数量,导致进程之间争夺CPU时间的竞争。

  2. 频繁的上下文切换和低系统效率:在 CPU 资源过度订阅的情况下,多个进程会竞争有限的 CPU 时间。操作系统需要快速地在这多个进程之间进行切换以实现资源的公平分配。这种频繁的上下文切换会导致额外的开销,并降低系统的整体运行效率。

避免CPU过度使用

避免CPU过度订阅的一个好办法是合理分配资源,确保同时运行的进程或线程数量不超过可用的CPU资源。

在这种情况下,解决办法是在子进程中指定合适的线程数量。可以使用torch.set_num_threads(int)函数为每个进程设置线程数。

假设机器上有 N 个 vCPU 并且将生成 M 个进程,每个进程中使用的最大 num_threads 值为 floor(N/M)。为了防止在 mnist_hogwild 示例中出现 CPU 资源过度订阅的情况,需要对示例仓库中的train.py文件进行以下修改。

def train(rank, args, model, device, dataset, dataloader_kwargs):
    torch.manual_seed(args.seed + rank)

    #### define the num threads used in current sub-processes
    torch.set_num_threads(floor(N/M))

    train_loader = torch.utils.data.DataLoader(dataset, **dataloader_kwargs)

    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
    for epoch in range(1, args.epochs + 1):
        train_epoch(epoch, args, model, device, train_loader, optimizer)

使用torch.set_num_threads(floor(N/M))为每个进程设置num_thread值。其中,N表示可用的vCPU数量,M表示选择的进程数。具体的num_thread值会根据任务的不同而变化。然而,作为一般指导原则,num_thread的最大值应为floor(N/M)以避免CPU过度订阅。在mnist_hogwild训练示例中,在避免了CPU过度订阅之后,可以实现30倍的性能提升。

本页目录