PyTorch 入门指南
学习 PyTorch
图像和视频
音频
后端
强化学习
在生产环境中部署 PyTorch 模型
Profiling PyTorch
代码变换与FX
前端API
扩展 PyTorch
模型优化
并行和分布式训练
边缘端的 ExecuTorch
推荐系统
多模态

使用 Cpp 扩展自定义进程组后端

作者: Howard Huang, Feng Tian, Shen Li, Min Si

先决条件:

本教程演示了如何实现一个自定义的 Backend 并将其通过 cpp extensions 集成到 PyTorch 分布式包 中。当您需要为硬件定制专门的软件栈,或者希望尝试新的集体通信算法时,这将非常有用。

基础知识

PyTorch 的集体通信功能支持多种广泛采用的分布式训练特性,包括 DistributedDataParallelZeroRedundancyOptimizer。为了使相同的集体通信 API 能够与不同的通信后端协同工作,分布式包将集体通信操作抽象为 Backend 类。然后,不同的后端可以使用首选的第三方库实现为 Backend 的子类。PyTorch 分布式提供了三个默认后端:ProcessGroupNCCLProcessGroupGlooProcessGroupMPI。然而,除了这三个后端之外,还有其他通信库(例如 UCCOneCCL)、不同类型的硬件(例如 TPUTrainum)以及新兴的通信算法(例如 HerringReduction Server)。因此,分布式包提供了扩展 API,以支持自定义集体通信后端。

以下四个步骤展示了如何实现一个虚拟的 Backend 后端,并在 Python 应用程序代码中使用它。请注意,本教程的重点是演示扩展 API,而非开发一个功能完整的通信后端。因此,这个 dummy 后端仅涵盖了部分 API(all_reduceall_gather),并且简单地将张量的值设置为 0。

第一步:实现 Backend 的子类

第一步是实现一个 Backend 子类,该类重写目标集体通信 API 并运行自定义通信算法。扩展还需要实现一个 Work 子类,该类作为通信结果的未来对象,并允许在应用代码中异步执行。如果扩展使用第三方库,它可以在 BackendDummy 子类中包含头文件并调用库 API。下面的两个代码片段展示了 dummy.hdummy.cpp 的实现。完整的实现请参考 dummy collectives 仓库。

// file name: dummy.hpp
#include<torch/python.h>

#include<torch/csrc/distributed/c10d/Backend.hpp>
#include<torch/csrc/distributed/c10d/Work.hpp>
#include<torch/csrc/distributed/c10d/Store.hpp>
#include<torch/csrc/distributed/c10d/Types.hpp>
#include<torch/csrc/distributed/c10d/Utils.hpp>

#include<pybind11/chrono.h>

namespacec10d{

classBackendDummy:publicBackend{
public:
BackendDummy(intrank,intsize);

c10::intrusive_ptr<Work>allgather(
std::vector<std::vector<at::Tensor>>&outputTensors,
std::vector<at::Tensor>&inputTensors,
constAllgatherOptions&opts=AllgatherOptions())override;

c10::intrusive_ptr<Work>allreduce(
std::vector<at::Tensor>&tensors,
constAllreduceOptions&opts=AllreduceOptions())override;

// The collective communication APIs without a custom implementation
// will error out if invoked by application code.
};

classWorkDummy:publicWork{
public:
WorkDummy(
OpTypeopType,
c10::intrusive_ptr<c10::ivalue::Future>future)// future of the output
:Work(
*1,// rank, only used by recvAnySource, irrelevant in this demo
opType),
future_(std::move(future)){}
boolisCompleted()override;
boolisSuccess()constoverride;
boolwait(std::chrono::millisecondstimeout=kUnsetTimeout)override;
virtualc10::intrusive_ptr<c10::ivalue::Future>getFuture()override;

private:
c10::intrusive_ptr<c10::ivalue::Future>future_;
};
}// namespace c10d
// file name: dummy.cpp
#include"dummy.hpp"

namespacec10d{

// This is a dummy allgather that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work>BackendDummy::allgather(
std::vector<std::vector<at::Tensor>>&outputTensors,
std::vector<at::Tensor>&inputTensors,
constAllgatherOptions&/* unused */){
for(auto&outputTensorVec:outputTensors){
for(auto&outputTensor:outputTensorVec){
outputTensor.zero_();
}
}

autofuture=c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
future->markCompleted(c10::IValue(outputTensors));
returnc10::make_intrusive<WorkDummy>(OpType::ALLGATHER,std::move(future));
}

// This is a dummy allreduce that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work>BackendDummy::allreduce(
std::vector<at::Tensor>&tensors,
constAllreduceOptions&opts){
for(auto&tensor:tensors){
tensor.zero_();
}

autofuture=c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::TensorType::get()));
future->markCompleted(c10::IValue(tensors));
returnc10::make_intrusive<WorkDummy>(OpType::ALLGATHER,std::move(future));
}
}// namespace c10d

第二步:公开扩展的 Python API

后端构造函数是从Python 端调用的,因此扩展还需要将构造函数 API 暴露给 Python。可以通过添加以下方法来实现。在这个例子中,BackendDummy 实例化方法忽略了 storetimeout,因为这些在虚拟实现中没有使用。然而,实际应用中的扩展应该考虑使用 store 来进行协调,并支持 timeout 参数。

// file name: dummy.hpp
classBackendDummy:publicBackend{
...
<Step1code>
...

staticc10::intrusive_ptr<Backend>createBackendDummy(
constc10::intrusive_ptr<::c10d::Store>&store,
intrank,
intsize,
conststd::chrono::duration<float>&timeout);

staticvoidBackendDummyConstructor()__attribute__((constructor)){
py::objectmodule=py::module::import("torch.distributed");
py::objectregister_backend=
module.attr("Backend").attr("register_backend");
// torch.distributed.Backend.register_backend will add `dummy` as a
// new valid backend.
register_backend("dummy",py::cpp_function(createBackendDummy));
}
}
// file name: dummy.cpp
c10::intrusive_ptr<Backend>BackendDummy::createBackendDummy(
constc10::intrusive_ptr<::c10d::Store>&/* unused */,
intrank,
intsize,
conststd::chrono::duration<float>&/* unused */){
returnc10::make_intrusive<BackendDummy>(rank,size);
}

PYBIND11_MODULE(TORCH_EXTENSION_NAME,m){
m.def("createBackendDummy",&BackendDummy::createBackendDummy);
}

第三步:构建自定义扩展

现在,扩展源代码文件已经准备就绪。接下来,我们可以使用 cpp extensions 来构建它。为此,创建一个 setup.py 文件,用于配置路径和命令。然后调用 python setup.py develop 来安装扩展。

如果扩展依赖于第三方库,您还可以为 cpp extension API 指定 libraries_dirslibraries。可以参考 torch ucc 项目作为实际示例。

# file name: setup.py
importos
importsys
importtorch
fromsetuptoolsimport setup
fromtorch.utilsimport cpp_extension

sources = ["src/dummy.cpp"]
include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"]

if torch.cuda.is_available():
    module = cpp_extension.CUDAExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )
else:
    module = cpp_extension.CppExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )

setup(
    name = "Dummy-Collectives",
    version = "0.0.1",
    ext_modules = [module],
    cmdclass={'build_ext': cpp_extension.BuildExtension}
)

步骤 4:在应用程序中使用扩展

安装后,您可以方便地在调用 init_process_group 时使用 dummy 后端,就像它是一个内置后端一样。

我们可以通过更改 init_process_groupbackend 参数来基于后端进行调度。通过将 cpu:gloo,cuda:dummy 指定为后端参数,我们可以将带有 CPU 张量的集体操作调度到 gloo 后端,并将带有 CUDA 张量的集体操作调度到 dummy 后端。

要将所有张量发送到 dummy 后端,我们可以简单地将 dummy 指定为后端参数。

importos

importtorch
# importing dummy_collectives makes torch.distributed recognize `dummy`
# as a valid backend.
importdummy_collectives

importtorch.distributedasdist

os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'

# Alternatively:
# dist.init_process_group("dummy", rank=0, world_size=1)
dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1)

# this goes through gloo
x = torch.ones(6)
dist.all_reduce(x)
print(f"cpu allreduce: {x}")

# this goes through dummy
if torch.cuda.is_available():
    y = x.cuda()
    dist.all_reduce(y)
    print(f"cuda allreduce: {y}")

    try:
        dist.broadcast(y, 0)
    except RuntimeError:
        print("got RuntimeError when calling broadcast")
本页目录