多进程包 - torch.multiprocessing

torch.multiprocessing 是基于原生 multiprocessing 模块的一个包装器。

它注册了自定义的reducers,这些reducers使用共享内存来提供不同进程中同一数据的共享视图。一旦张量/存储被移动到共享内存(参见share_memory_()),就可以在不进行复制的情况下将其发送给其他进程。

该 API 完全兼容原始模块——只需将 import multiprocessing 更改为 import torch.multiprocessing,所有张量就会被移动到共享内存中,无论是通过队列发送还是通过其他机制进行共享。

由于 API 相似,我们没有记录此包的大部分内容,并建议参考原模块的详细文档。

警告

如果主进程因意外原因(如接收信号)突然退出,Python的multiprocessing有时无法正确清理其子进程。这是一个已知的问题,所以如果你在中断解释器后发现有资源泄漏的情况,很可能就是这个问题导致的。

策略管理

torch.multiprocessing.get_all_sharing_strategies()[源代码]

返回当前系统中支持的共享策略集。

torch.multiprocessing.get_sharing_strategy()[源代码]

返回当前用于共享 CPU 张量的策略。

torch.multiprocessing.set_sharing_strategy(new_strategy)[源代码]

设定共享CPU张量的策略。

参数

new_strategy (str) – 选定的策略名称。应为 get_all_sharing_strategies() 返回值之一。

CUDA张量的共享

仅在 Python 3 中,使用 spawnforkserver 启动方法支持在进程间共享 CUDA 张量。

与CPU张量不同,发送进程需要在整个接收进程中持有张量副本期间保留原始张量。虽然引用计数在后台自动处理,但仍需用户遵守以下最佳实践。

警告

如果消费者进程因收到致命信号而异常终止,那么只要发送进程还在运行,共享张量就可能一直保留在内存中。

  1. 请尽快在消费者中释放内存。

## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)

2. 让生产者进程一直运行,直到所有消费者都退出。这样可以避免生产者进程在内存仍被消费者使用时释放内存的问题。

## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
  1. 不要传递接收的张量。

# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()

分享策略

本节提供了一种不同的共享策略工作原理的简要概述。需要注意的是,这仅适用于CPU张量,而CUDA张量则始终使用CUDA API,因为这是它们能够被共享的唯一方式。

文件描述符 - file_descriptor

注意

这是默认策略(除 macOS 和 OS X 外,后者不支持此功能)。

此策略将使用文件描述符作为共享内存句柄。每当存储被移动到共享内存时,会从 shm_open 获得一个文件描述符并将其缓存到对象中。当需要发送给其他进程时,该文件描述符会通过(例如 UNIX 套接字)传递给接收方,并由其进行缓存和使用 mmap 来获取存储数据的共享视图。

注意,如果有很多张量需要共享,此策略会一直保持大量文件描述符处于打开状态。如果你系统的可打开文件描述符数量有限制,并且你无法提高这个限制,你应该使用file_system 策略。

文件系统 - file_system

此策略使用传递给 shm_open 的文件名来识别共享内存区域。这样可以避免实现中需要缓存从该操作获取的文件描述符,但同时容易导致共享内存泄漏。由于其他进程需要访问这些文件以打开其视图,因此不能在创建后立即删除它们。如果进程因致命错误崩溃或被杀死,并且没有调用存储析构函数,则这些文件将保留在系统中。这非常严重,因为它们会一直占用内存,直到系统重启或手动释放。

为了解决共享内存文件泄漏的问题,torch.multiprocessing 会启动一个名为 torch_shm_manager 的守护进程。该守护进程将与当前进程组隔离,并跟踪所有共享内存分配。一旦所有连接到它的进程退出,它会等待一段时间以确保没有新的连接出现,然后遍历并检查该组中所有已分配的共享内存文件。如果发现任何未被释放的共享内存文件,则会将其释放。我们已经测试了这种方法,证明其对各种故障具有鲁棒性。然而,如果你的系统有足够的限制,并且 file_descriptor 是一个支持的策略,我们不建议切换到这个方法。

生成子进程

注意

支持 Python 3.4 及以上版本。

这取决于Python的 包中的 spawn启动方法。

可以通过创建Process实例并调用join来等待它们完成,从而启动多个子进程以执行某些功能。这种方法在处理单个子进程时效果很好,但在处理多个进程时可能会遇到一些问题。

也就是说,顺序连接进程意味着它们会依次终止。如果进程未能如预期那样终止,特别是第一个进程没有终止的情况下,这种情况可能不会被发现。另外,系统中也没有内置机制来处理和传播错误。

下面的 spawn 函数解决了这些问题,处理了错误传播和乱序终止的问题,并且一旦检测到一个进程出错,就会主动终止所有进程。

torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[源代码]

创建 nprocs 个进程,每个进程用 args 参数来运行 fn 函数。

如果其中一个进程以非零退出状态退出,其余进程将被杀死,并抛出一个包含终止原因的异常。如果子进程中捕获到异常,该异常将被转发,并将其回溯信息包含在父进程中抛出的异常中。

参数
  • fn (函数) –

    该函数被调用作为派生进程的入口点。它必须在模块的顶层定义,以便可以被序列化和启动。这是multiprocessing模块的要求。

    该函数的调用方式为 fn(i, *args),其中 i 表示进程索引,args 是一个包含所有参数的元组。

  • args (元组) – 传递给函数 fn 的参数。

  • nprocs (int) – 指定要创建的进程数。

  • join (bool) – 对所有进程进行阻塞式的等待连接。

  • daemon (bool) – 孵化进程的守护进程标志。若设置为 True,将创建守护进程。

  • start_method (str) – (已弃用)此参数始终使用 spawn 作为启动方法。若要使用其他启动方法,请调用 start_processes()

返回值

如果 joinTrue,则返回 None;如果 joinFalse,则返回 ProcessContext

torch.multiprocessing.SSpawnContext[源代码]

在使用join=False调用spawn()时返回的结果。

join(timeout=None)

在一个或多个进程中加入spawn上下文。

尝试在此 spawn 上下文中启动一个或多个进程。如果其中一个进程以非零退出状态终止,该函数会终止其余的进程,并抛出一个包含第一个进程退出原因的异常。

如果所有进程都已成功加入,则返回True,否则如果有更多的进程需要加入,则返回False

参数

timeout (float) – 等待超时时间,超过这个时间则放弃等待。

本页目录