TorchScript 中的动态并行性
TorchScript 不再处于积极开发阶段。
在本教程中,我们将介绍在 TorchScript 中实现动态交互并行的语法。这种并行性具有以下特性:
-
动态 - 创建的并行任务数量及其工作量可以取决于程序的控制流程。
-
操作间 - 并行性关注的是并行运行 TorchScript 程序片段。这与操作内并行性不同,后者关注的是拆分单个操作符并并行运行操作符工作的子集。
基本语法
动态并行的两个重要 API 是:
-
torch.jit.fork(fn : Callable[..., T], *args, **kwargs) -> torch.jit.Future[T]
-
torch.jit.wait(fut : torch.jit.Future[T]) -> T
展示这些如何工作的一个好方法是通过一个示例:
importtorch
deffoo(x):
return torch.neg(x)
@torch.jit.script
defexample(x):
# Call `foo` using parallelism:
# First, we "fork" off a task. This task will run `foo` with argument `x`
future = torch.jit.fork(foo, x)
# Call `foo` normally
x_normal = foo(x)
# Second, we "wait" on the task. Since the task may be running in
# parallel, we have to "wait" for its result to become available.
# Notice that by having lines of code between the "fork()" and "wait()"
# call for a given Future, we can overlap computations so that they
# run in parallel.
x_parallel = torch.jit.wait(future)
return x_normal, x_parallel
print(example(torch.ones(1))) # (-1., -1.)
fork()
接受可调用对象 fn
以及该可调用对象的参数 args
和 kwargs
,并为 fn
的执行创建一个异步任务。fn
可以是一个函数、方法或 Module 实例。fork()
返回一个对此执行结果值的引用,称为 Future
。由于 fork
在创建异步任务后立即返回,因此在执行 fork()
调用之后的代码行时,fn
可能尚未被执行。因此,使用 wait()
来等待异步任务完成并返回值。
这些结构可以用于在函数内部重叠执行语句(如示例部分所示),或者与其他语言结构(如循环)组合使用:
importtorch
fromtypingimport List
deffoo(x):
return torch.neg(x)
@torch.jit.script
defexample(x):
futures : List[torch.jit.Future[torch.Tensor]] = []
for _ in range(100):
futures.append(torch.jit.fork(foo, x))
results = []
for future in futures:
results.append(torch.jit.wait(future))
return torch.sum(torch.stack(results))
print(example(torch.ones([])))
当我们初始化一个空的 Futures 列表时,需要为
futures
添加显式的类型注解。在 TorchScript 中,空容器默认假设它们包含 Tensor 值,因此我们将列表构造器注解为List[torch.jit.Future[torch.Tensor]]
类型。
这个示例使用 fork()
启动了 100 个 foo
函数的实例,等待这 100 个任务完成,然后对结果进行求和,最终返回 -100.0
。
应用示例:双向 LSTM 集成
让我们尝试将并行化应用到一个更实际的例子中,看看能从中获得怎样的性能提升。首先,我们定义基线模型:一个由双向 LSTM 层组成的集成模型。
importtorch,time
# In RNN parlance, the dimensions we care about are:
# # of time-steps (T)
# Batch size (B)
# Hidden size/number of "channels" (C)
T, B, C = 50, 50, 1024
# A module that defines a single "bidirectional LSTM". This is simply two
# LSTMs applied to the same sequence, but one in reverse
classBidirectionalRecurrentLSTM(torch.nn.Module):
def__init__(self):
super().__init__()
self.cell_f = torch.nn.LSTM(input_size=C, hidden_size=C)
self.cell_b = torch.nn.LSTM(input_size=C, hidden_size=C)
defforward(self, x : torch.Tensor) -> torch.Tensor:
# Forward layer
output_f, _ = self.cell_f(x)
# Backward layer. Flip input in the time dimension (dim 0), apply the
# layer, then flip the outputs in the time dimension
x_rev = torch.flip(x, dims=[0])
output_b, _ = self.cell_b(torch.flip(x, dims=[0]))
output_b_rev = torch.flip(output_b, dims=[0])
return torch.cat((output_f, output_b_rev), dim=2)
# An "ensemble" of `BidirectionalRecurrentLSTM` modules. The modules in the
# ensemble are run one-by-one on the same input then their results are
# stacked and summed together, returning the combined result.
classLSTMEnsemble(torch.nn.Module):
def__init__(self, n_models):
super().__init__()
self.n_models = n_models
self.models = torch.nn.ModuleList([
BidirectionalRecurrentLSTM() for _ in range(self.n_models)])
defforward(self, x : torch.Tensor) -> torch.Tensor:
results = []
for model in self.models:
results.append(model(x))
return torch.stack(results).sum(dim=0)
# For a head-to-head comparison to what we're going to do with fork/wait, let's
# instantiate the model and compile it with TorchScript
ens = torch.jit.script(LSTMEnsemble(n_models=4))
# Normally you would pull this input out of an embedding table, but for the
# purpose of this demo let's just use random data.
x = torch.rand(T, B, C)
# Let's run the model once to warm up things like the memory allocator
ens(x)
x = torch.rand(T, B, C)
# Let's see how fast it runs!
s = time.time()
ens(x)
print('Inference took', time.time() - s, ' seconds')
在我的机器上,这个网络的运行时间是 2.05
秒。我们还可以做得更好!
并行化前向与后向层
我们可以做的一件非常简单的事情是并行化 BidirectionalRecurrentLSTM
中的前向层和后向层。为此,计算的结构是静态的,因此我们实际上甚至不需要任何循环。让我们像这样重写 BidirectionalRecurrentLSTM
的 forward
方法:
defforward(self, x : torch.Tensor) -> torch.Tensor:
# Forward layer - fork() so this can run in parallel to the backward
# layer
future_f = torch.jit.fork(self.cell_f, x)
# Backward layer. Flip input in the time dimension (dim 0), apply the
# layer, then flip the outputs in the time dimension
x_rev = torch.flip(x, dims=[0])
output_b, _ = self.cell_b(torch.flip(x, dims=[0]))
output_b_rev = torch.flip(output_b, dims=[0])
# Retrieve the output from the forward layer. Note this needs to happen
# *after* the stuff we want to parallelize with
output_f, _ = torch.jit.wait(future_f)
return torch.cat((output_f, output_b_rev), dim=2)
在这个例子中,forward()
将 cell_f
的执行委托给另一个线程,同时继续执行 cell_b
。这导致两个单元格的执行相互重叠。
通过这个简单的修改再次运行脚本,运行时间缩短至 1.71
秒,性能提升了 17%
!
旁注:并行性的可视化
我们尚未完成模型的优化,但值得介绍我们用于可视化性能的工具。其中一个重要工具是 PyTorch profiler。
让我们结合 Chrome 的跟踪导出功能使用 profiler 来可视化并行化模型的性能:
with torch.autograd.profiler.profile() as prof:
ens(x)
prof.export_chrome_trace('parallel.json')
这段代码将生成一个名为 parallel.json
的文件。如果您在 Google Chrome 中访问 chrome://tracing
,点击 Load
按钮并加载该 JSON 文件,您将看到类似以下的时间线:
时间线的横轴表示时间,纵轴表示执行线程。正如我们所看到的,我们同时运行了两个 lstm
实例。这是我们为双向层并行化所付出的努力的结果!
并行化集成中的模型
您可能已经注意到,我们的代码中还存在进一步的并行化机会:我们还可以并行运行 LSTMEnsemble
中包含的模型。实现这一点的方法相当简单,以下是我们应该如何更改 LSTMEnsemble
的 forward
方法的示例:
defforward(self, x : torch.Tensor) -> torch.Tensor:
# Launch tasks for each model
futures : List[torch.jit.Future[torch.Tensor]] = []
for model in self.models:
futures.append(torch.jit.fork(model, x))
# Collect the results from the launched tasks
results : List[torch.Tensor] = []
for future in futures:
results.append(torch.jit.wait(future))
return torch.stack(results).sum(dim=0)
或者,如果您注重简洁性,我们可以使用列表推导式:
defforward(self, x : torch.Tensor) -> torch.Tensor:
futures = [torch.jit.fork(model, x) for model in self.models]
results = [torch.jit.wait(fut) for fut in futures]
return torch.stack(results).sum(dim=0)
正如引言所述,我们使用了循环来为集成模型中的每个模型分配任务。然后,我们使用了另一个循环来等待所有任务完成。这进一步提高了计算的重叠性。
经过这个小更新后,脚本运行时间为 1.4
秒,总体速度提升了 32%
!对于两行代码来说,效果相当不错。
我们还可以再次使用 Chrome 追踪器来观察发生了什么:
现在我们可以看到所有的 LSTM
实例都在完全并行地运行。
结论
在本教程中,我们学习了 fork()
和 wait()
,这些是 TorchScript 中实现动态、跨操作并行的基本 API。我们看到了使用这些函数来并行化 TorchScript 代码中函数、方法或 Modules
执行的几种典型用法模式。最后,我们通过一个示例展示了如何使用这种技术优化模型,并探索了 PyTorch 中可用的性能测量和可视化工具。