分布式管道并行化简介
作者: Howard Huang
本教程使用了一个基于 GPT 风格的 Transformer 模型,通过 torch.distributed.pipelining API 来演示如何实现分布式管道并行。
你将学到什么
-
如何使用
torch.distributed.pipelining
API -
如何将流水线并行应用于 Transformer 模型
-
如何在一组微批次上利用不同的调度策略
前提条件
设置
使用 torch.distributed.pipelining
,我们将对模型的执行进行分区,并在微批次上调度计算。我们将使用一个简化版的 transformer 解码器模型。该模型架构用于教学目的,包含多个 transformer 解码器层,以便演示如何将模型分割成不同的块。首先,让我们定义模型:
importtorch
importtorch.nnasnn
fromdataclassesimport dataclass
@dataclass
classModelArgs:
dim: int = 512
n_layers: int = 8
n_heads: int = 8
vocab_size: int = 10000
classTransformer(nn.Module):
def__init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)
# Using a ModuleDict lets us delete layers witout affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)
self.norm = nn.LayerNorm(model_args.dim)
self.output = nn.Linear(model_args.dim, model_args.vocab_size)
defforward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, h)
h = self.norm(h) if self.norm else h
output = self.output(h).clone() if self.output else h
return output
然后,我们需要在脚本中导入必要的库并初始化分布式训练过程。在本例中,我们定义了一些全局变量以便稍后在脚本中使用:
importos
importtorch.distributedasdist
fromtorch.distributed.pipeliningimport pipeline, SplitPoint, PipelineStage, ScheduleGPipe
global rank, device, pp_group, stage_index, num_stages
definit_distributed():
global rank, device, pp_group, stage_index, num_stages
rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
dist.init_process_group()
# This group can be a sub-group in the N-D parallel case
pp_group = dist.new_group()
stage_index = rank
num_stages = world_size
rank
、world_size
和 init_process_group()
代码对您来说应该很熟悉,因为它们在所有分布式程序中都很常用。与流水线并行相关的全局变量包括 pp_group
,它是用于发送/接收通信的进程组;stage_index
,在这个例子中,每个阶段只有一个 rank,因此索引等同于 rank;以及 num_stages
,它等同于 world_size。
num_stages
用于设置流水线并行调度中使用的阶段数。例如,对于 num_stages=4
,一个微批次需要经过 4 次前向和 4 次反向传播才能完成。stage_index
对于框架来说至关重要,因为它决定了如何在各个阶段之间进行通信。例如,对于第一个阶段(stage_index=0
),它将使用来自数据加载器的数据,并且不需要从任何前一个节点接收数据来执行其计算。
步骤 1:划分 Transformer 模型
有两种不同的模型分区方式:
第一种是手动模式,我们可以通过删除模型的部分属性来手动创建两个模型实例。在这个例子中,对于两个阶段(2个等级),模型被一分为二。
defmanual_model_split(model) -> PipelineStage:
if stage_index == 0:
# prepare the first stage model
for i in range(4, 8):
del model.layers[str(i)]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
for i in range(4):
del model.layers[str(i)]
model.tok_embeddings = None
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
)
return stage
如我们所见,第一阶段不包含层归一化或输出层,仅包含前四个 Transformer 块。第二阶段不包含输入嵌入层,但包含输出层和最后四个 Transformer 块。该函数随后返回当前 rank 的 PipelineStage
。
第二种方法是基于 tracer 的模式,它会根据 split_spec
参数自动拆分模型。通过使用 pipeline 规范,我们可以指示 torch.distributed.pipelining
在何处拆分模型。在以下代码块中,我们在第四个 Transformer 解码器层之前进行拆分,与上述手动拆分方式一致。同样地,在拆分完成后,我们可以通过调用 build_stage
来获取一个 PipelineStage
。
第二步:定义主执行逻辑
在主函数中,我们将创建一个特定的管道调度计划,各个阶段应遵循该计划。torch.distributed.pipelining
支持多种调度计划,包括单阶段单进程的调度计划 GPipe
和 1F1B
,以及多阶段单进程的调度计划,如 Interleaved1F1B
和 LoopedBFS
。
if __name__ == "__main__":
init_distributed()
num_microbatches = 4
model_args = ModelArgs()
model = Transformer(model_args)
# Dummy data
x = torch.ones(32, 500, dtype=torch.long)
y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
example_input_microbatch = x.chunk(num_microbatches)[0]
# Option 1: Manual model splitting
stage = manual_model_split(model)
# Option 2: Tracer model splitting
# stage = tracer_model_split(model, example_input_microbatch)
model.to(device)
x = x.to(device)
y = y.to(device)
deftokenwise_loss_fn(outputs, targets):
loss_fn = nn.CrossEntropyLoss()
outputs = outputs.reshape(-1, model_args.vocab_size)
targets = targets.reshape(-1)
return loss_fn(outputs, targets)
schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)
if rank == 0:
schedule.step(x)
elif rank == 1:
losses = []
output = schedule.step(target=y, losses=losses)
print(f"losses: {losses}")
dist.destroy_process_group()
在上面的示例中,我们使用手动方法来拆分模型,但可以取消注释代码以尝试基于追踪器的模型拆分功能。在我们的调度器中,我们需要传入微批次数和用于评估目标的损失函数。
.step()
函数会处理整个小批量,并根据之前传入的 n_microbatches
自动将其拆分为多个微批次。然后,这些微批次会根据调度类进行操作。在上面的示例中,我们使用的是 GPipe,它遵循简单的前向传播然后反向传播的调度。从 rank 1 返回的输出将与模型在单个 GPU 上运行并使用整个批次时的输出相同。同样地,我们可以传入一个 losses
容器来存储每个微批次对应的损失。
步骤 3: 启动分布式进程
最后,我们准备运行脚本。我们将使用 torchrun
创建一个单主机、双进程的任务。我们的脚本已经编写好了,rank 0 执行管道阶段 0 所需的逻辑,而 rank 1 则执行管道阶段 1 的逻辑。
torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py
总结
在本教程中,我们学习了如何使用 PyTorch 的 torch.distributed.pipelining
API 实现分布式流水线并行。我们探讨了如何设置环境、定义一个 Transformer 模型,并对其进行分区以进行分布式训练。我们讨论了两种模型分区方法:手动分区和基于追踪器的分区,并演示了如何在不同阶段调度微批次的计算。最后,我们介绍了如何执行流水线调度以及如何使用 torchrun
启动分布式进程。
附加资源
我们已成功将 torch.distributed.pipelining
集成到 torchtitan 仓库中。TorchTitan 是一个简洁、最小化的代码库,用于使用原生 PyTorch 进行大规模 LLM 训练。如需了解生产环境中管道并行性及其与其他分布式技术组合的使用,请参阅 TorchTitan 的 3D 并行性端到端示例。