多进程

一个库,用于启动和管理由函数或二进制文件指定的n个工人子进程。

对于函数,它使用torch.multiprocessing(以及由此使用的Python multiprocessing)来启动或分叉工作进程。对于二进制文件,它使用Python subprocess.Popen来创建工作进程。

用法 1:以函数形式启动两个训练器

from torch.distributed.elastic.multiprocessing import Std, start_processes

def trainer(a, b, c):
    pass # train


# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
        name="trainer",
        entrypoint=trainer,
        args={0: (1,2,3), 1: (4,5,6)},
        envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
        log_dir="/tmp/foobar",
        redirects=Std.ALL, # write all worker stdout/stderr to a log file
        tee={0: Std.ERR}, # tee only local rank 0's stderr to console
      )

# waits for all copies of trainer to finish
ctx.wait()

用法 2:以二进制文件形式启动 2 个回显工作进程

# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
        name="echo"
        entrypoint="echo",
        log_dir="/tmp/foobar",
        args={0: "hello", 1: "world"},
        redirects={1: Std.OUT},
       )

就像torch.multiprocessing一样,函数start_processes()的返回值是一个进程上下文(api.PContext)。如果启动了一个函数,则会返回一个api.MultiprocessContext;如果启动了一个二进制文件,则会返回一个api.SubprocessContext。这两个都是父类api.PContext的具体实现。

启动多个工作进程

torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[源代码]

根据提供的选项,启动 nentrypoint 进程。

entrypoint 可以是一个 Callable(函数)或一个 str(二进制文件)。副本的数量由 argsenvs 参数的条目数量决定,这两个参数需要具有相同的键集。

argsenv 参数是要传递给由副本索引(本地排名)映射的入口点的参数和环境变量。所有本地排名都必须被包含在内,也就是说,键集应该是 {0,1,...,(nprocs-1)}

注意

entrypoint 是一个二进制文件(str)时,args 只能是字符串。如果给定其他类型,则会将其转换为字符串形式(例如 str(arg1))。此外,只有当主函数使用了 torch.distributed.elastic.multiprocessing.errors.record 注解时,二进制文件失败才会写入一个 error.json 错误文件。对于函数启动,默认情况下会自动处理这一点,并且不需要手动添加 @record 注解。

redirectstee 是位掩码,用于指定要将哪些标准流重定向到 log_dir 中的日志文件。有效的掩码值在 Std 中定义。为了只针对某些本地 rank 进行重定向,请将 redirects 作为键为本地 rank 的映射传递,以指定其重定向行为。任何未指定的本地 rank 将默认使用 Std.NONE

tee 像 Unix 的“tee”命令一样,将输出重定向并打印到控制台。为了避免工作进程的 stdout 和 stderr 打印到控制台,请使用 redirects 参数。

对于每个进程,log_dir 包含以下内容:

  1. {local_rank}/error.json: 如果进程失败,该文件将包含错误信息

  2. {local_rank}/stdout.json: 如果 redirect & STDOUT == STDOUT

  3. {local_rank}/stderr.json: 如果 redirect & STDERR == STDERR

注意

期望log_dir已经存在、为空,并且是一个目录。

示例:

log_dir = "/tmp/test"

# ok; two copies of foo: foo("bar0"), foo("bar1")
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
)

# invalid; envs missing for local rank 1
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}},
   log_dir=log_dir
)

# ok; two copies of /usr/bin/touch: touch file1, touch file2
start_processes(
   name="trainer",
   entrypoint="/usr/bin/touch",
   args:{0:("file1",), 1:("file2",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )

# caution; arguments casted to string, runs:
# echo "1" "2" "3" and echo "[1, 2, 3]"
start_processes(
   name="trainer",
   entrypoint="/usr/bin/echo",
   args:{0:(1,2,3), 1:([1,2,3],),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )
参数
  • name (str) – 一个描述进程作用的人类可读的短名称,用于在tee输出stdout和stderr时作为标题。

  • entrypoint (Union[Callable, str]) – 可以是一个 Callable(函数)或 cmd(二进制可执行文件)

  • args (Dict[int, Tuple]) – 每个副本的参数

  • envs (Dict[int, Dict[str, str]]) – 每个副本的环境变量

  • log_dir - 用于存放日志文件的目录

  • start_method (str) – 多进程的启动方法(spawn、fork、forkserver),但对二进制文件无效

  • redirects - 将哪些标准流重定向到日志文件中

  • tee - 指定要重定向并输出到控制台的标准流

  • local_ranks_filter — 指定要输出到控制台的排名的日志

返回类型

PContext

进程上下文

classtorch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]

一个标准基类,用于通过不同机制启动的一组进程中执行的操作。

命名 PContext 的目的是为了与 torch.multiprocessing.ProcessContext 区分开来。

警告

stdouts 和 stderrs 应始终包含 tee_stdouts 和 tee_stderrs(分别对应)。这是因为在实现中,tee 是通过重定向和 tail -f <stdout/stderr.log> 来完成的。

torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[源代码]

PContext 持有着以函数形式调用的 worker 进程。

classtorch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]

PContext 持有以二进制形式调用的 worker 进程。

torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[源代码]

这是由 start_processes() 启动的进程完成后得到的结果,由 PContext 返回。

请留意以下几点:

  1. 所有字段都根据本地排名进行映射

  2. return_values - 仅适用于函数(不包括二进制文件)。

  3. stdouts - 标准输出日志文件的路径(如果没有重定向,则留空)

  4. stderrs - stderr.log 文件的路径(如果没有重定向,则留空)

torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[源代码]

默认的 LogsSpecs 实现:

  • 如果 log_dir 不存在,它将被创建

  • 为每种尝试和排名生成嵌套文件夹。

reify(envs)[源代码]

按照以下方案构建日志目标路径:

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stdout.log

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stderr.log

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/error.json

返回类型

LogsDest

classtorch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[源代码]

对于每种日志类型,保存从本地rank ID到文件路径的映射。

torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[源代码]

定义每个工作进程的日志处理和重定向。

参数
  • log_dir (Optional[str]) – 日志写入的基础目录。

  • redirects (Union[Std, Dict[int, Std]]): 将流重定向到文件。传递一个单独的 Std 枚举来为所有工作者进行重定向,或者使用 local_rank 作为键的映射来进行选择性重定向。

  • tee (Union[Std, Dict[int, Std]]) – 要复制到 stdout/stderr 的流。传递一个单独的 Std 枚举以对所有工作者进行流复制,或者使用 local_rank 作为键来选择性地复制。

abstractreify(envs)[源代码]

根据环境变量,为每个本地 ranks 构建日志文件的目的地。

Envs 参数包含了为每个本地排名定义的环境变量字典,这些定义在 _start_workers() 中。

返回类型

LogsDest

本页目录