多进程
一个库,用于启动和管理由函数或二进制文件指定的n个工人子进程。
对于函数,它使用torch.multiprocessing
(以及由此使用的Python multiprocessing
)来启动或分叉工作进程。对于二进制文件,它使用Python subprocess.Popen
来创建工作进程。
用法 1:以函数形式启动两个训练器
from torch.distributed.elastic.multiprocessing import Std, start_processes def trainer(a, b, c): pass # train # runs two trainers # LOCAL_RANK=0 trainer(1,2,3) # LOCAL_RANK=1 trainer(4,5,6) ctx = start_processes( name="trainer", entrypoint=trainer, args={0: (1,2,3), 1: (4,5,6)}, envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}}, log_dir="/tmp/foobar", redirects=Std.ALL, # write all worker stdout/stderr to a log file tee={0: Std.ERR}, # tee only local rank 0's stderr to console ) # waits for all copies of trainer to finish ctx.wait()
用法 2:以二进制文件形式启动 2 个回显工作进程
# same as invoking # echo hello # echo world > stdout.log ctx = start_processes( name="echo" entrypoint="echo", log_dir="/tmp/foobar", args={0: "hello", 1: "world"}, redirects={1: Std.OUT}, )
就像torch.multiprocessing
一样,函数start_processes()
的返回值是一个进程上下文(api.PContext
)。如果启动了一个函数,则会返回一个api.MultiprocessContext
;如果启动了一个二进制文件,则会返回一个api.SubprocessContext
。这两个都是父类api.PContext
的具体实现。
启动多个工作进程
- torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[源代码]
-
根据提供的选项,启动
n
个entrypoint
进程。entrypoint
可以是一个Callable
(函数)或一个str
(二进制文件)。副本的数量由args
和envs
参数的条目数量决定,这两个参数需要具有相同的键集。args
和env
参数是要传递给由副本索引(本地排名)映射的入口点的参数和环境变量。所有本地排名都必须被包含在内,也就是说,键集应该是{0,1,...,(nprocs-1)}
。注意
当
entrypoint
是一个二进制文件(str
)时,args
只能是字符串。如果给定其他类型,则会将其转换为字符串形式(例如str(arg1)
)。此外,只有当主函数使用了torch.distributed.elastic.multiprocessing.errors.record
注解时,二进制文件失败才会写入一个error.json
错误文件。对于函数启动,默认情况下会自动处理这一点,并且不需要手动添加@record
注解。redirects
和tee
是位掩码,用于指定要将哪些标准流重定向到log_dir
中的日志文件。有效的掩码值在Std
中定义。为了只针对某些本地 rank 进行重定向,请将redirects
作为键为本地 rank 的映射传递,以指定其重定向行为。任何未指定的本地 rank 将默认使用Std.NONE
。tee
像 Unix 的“tee”命令一样,将输出重定向并打印到控制台。为了避免工作进程的 stdout 和 stderr 打印到控制台,请使用redirects
参数。对于每个进程,
log_dir
包含以下内容:-
{local_rank}/error.json
: 如果进程失败,该文件将包含错误信息 -
{local_rank}/stdout.json
: 如果redirect & STDOUT == STDOUT
-
{local_rank}/stderr.json
: 如果redirect & STDERR == STDERR
注意
期望
log_dir
已经存在、为空,并且是一个目录。示例:
log_dir = "/tmp/test" # ok; two copies of foo: foo("bar0"), foo("bar1") start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # invalid; envs missing for local rank 1 start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}}, log_dir=log_dir ) # ok; two copies of /usr/bin/touch: touch file1, touch file2 start_processes( name="trainer", entrypoint="/usr/bin/touch", args:{0:("file1",), 1:("file2",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # caution; arguments casted to string, runs: # echo "1" "2" "3" and echo "[1, 2, 3]" start_processes( name="trainer", entrypoint="/usr/bin/echo", args:{0:(1,2,3), 1:([1,2,3],), envs:{0:{}, 1:{}}, log_dir=log_dir )
- 参数
-
-
name (str) – 一个描述进程作用的人类可读的短名称,用于在tee输出stdout和stderr时作为标题。
-
entrypoint (Union[Callable, str]) – 可以是一个
Callable
(函数)或cmd
(二进制可执行文件) -
log_dir - 用于存放日志文件的目录
-
start_method (str) – 多进程的启动方法(spawn、fork、forkserver),但对二进制文件无效
-
redirects - 将哪些标准流重定向到日志文件中
-
tee - 指定要重定向并输出到控制台的标准流
-
local_ranks_filter — 指定要输出到控制台的排名的日志
-
- 返回类型
-
进程上下文
- classtorch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]
-
一个标准基类,用于通过不同机制启动的一组进程中执行的操作。
命名
PContext
的目的是为了与torch.multiprocessing.ProcessContext
区分开来。警告
stdouts 和 stderrs 应始终包含 tee_stdouts 和 tee_stderrs(分别对应)。这是因为在实现中,tee 是通过重定向和 tail -f <stdout/stderr.log> 来完成的。
- 类torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[源代码]
-
PContext
持有着以函数形式调用的 worker 进程。
- classtorch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]
-
PContext
持有以二进制形式调用的 worker 进程。
- 类torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[源代码]
-
这是由
start_processes()
启动的进程完成后得到的结果,由PContext
返回。请留意以下几点:
-
所有字段都根据本地排名进行映射
-
return_values
- 仅适用于函数(不包括二进制文件)。 -
stdouts
- 标准输出日志文件的路径(如果没有重定向,则留空) -
stderrs
- stderr.log 文件的路径(如果没有重定向,则留空)
-
- 类torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[源代码]
-
默认的 LogsSpecs 实现:
-
如果 log_dir 不存在,它将被创建
-
为每种尝试和排名生成嵌套文件夹。
-
- classtorch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[源代码]
-
对于每种日志类型,保存从本地rank ID到文件路径的映射。
- 类torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[源代码]
-
定义每个工作进程的日志处理和重定向。
- 参数