torch.futures

此包提供了一个 Future 类型,用于封装异步执行,并提供了一系列实用函数来简化对 Future 对象的操作。目前,Future 类型主要被 分布式 RPC 框架 使用。

torch.futures.Future(*, devices=None)

这是一个围绕torch._C.Future的封装,用于包含可调用对象的异步执行,例如rpc_async()。此外,它还提供了一组API来添加回调函数和设置结果。

警告

GPU支持是一项Beta功能,可能随时发生变化。

add_done_callback(callback)[源代码]

将给定的回调函数附加到此 Future,当 Future 完成时会触发该回调。可以向同一个 Future 添加多个回调,但它们执行的顺序无法保证。回调必须接受一个参数,即此 Future 的引用。回调函数可以使用 value() 方法获取值。需要注意的是,如果此 Future 已经完成,则给定的回调将立即执行。

我们建议你使用then() 方法,因为它提供了一种在回调完成后进行同步的方式。如果你的回调不返回任何内容,则add_done_callback 可能更便宜。但是,无论是then() 还是add_done_callback,它们在底层都使用相同的回调注册 API。

对于 GPU 张量来说,该方法的行为与 then() 相同。

参数

callback (Future) – 一个带有参数的 Callable,该参数是此 Future 对象的引用。

注意

注意,如果回调函数抛出异常(无论是通过原始 future 对象完成时带有异常并调用 fut.wait(),还是通过回调中的其他代码),必须仔细进行错误处理。例如,如果此回调稍后完成了额外的 futures,这些 futures 不会被标记为已完成的状态,并且用户需要独立地处理和等待这些 futures。

示例:
>>> def callback(fut):
...     print("This will run after the future has finished.")
...     print(fut.wait())
>>> fut = torch.futures.Future()
>>> fut.add_done_callback(callback)
>>> fut.set_result(5)
This will run after the future has finished.
5
done()[源代码]

如果此 Future 已完成,则返回 True。当 Future 具有结果或异常时,它就被认为是已完成的。

如果值包含位于 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,Future.done() 仍将返回 True。因为在该阶段结果已经是可用的,只要执行适当的同步操作(参见wait())。

返回类型

bool

set_exception(result)[源代码]

为此 Future 设置一个异常,将其标记为已完成并带有错误状态,并触发所有附加的回调。需要注意的是,在调用此 Future 的 wait()/value() 方法时,此处设置的异常将会被内联抛出。

参数

result (BaseException) - 此 Future 对象的异常。

示例:
>>> fut = torch.futures.Future()
>>> fut.set_exception(ValueError("foo"))
>>> fut.wait()
Traceback (most recent call last):
...
ValueError: foo
set_result(result)[源代码]

设置此 Future 的结果,将其标记为已完成,并触发所有附加的回调。需要注意的是,Future 不能被标记两次为已完成。

如果结果包含位于 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,只要在此方法调用时将那些内核入队的流设置为当前流,则仍可以调用此方法。简单来说,在启动这些内核后立即调用此方法是安全的,并且无需任何额外同步,只要在这期间不更改流即可。此方法将在所有相关的当前流上记录事件,并使用它们来确保 Future 的所有消费者都能得到正确的调度。

参数

result (对象) – 此 Future 的结果。

示例:
>>> import threading
>>> import time
>>> def slow_set_future(fut, value):
...     time.sleep(0.5)
...     fut.set_result(value)
>>> fut = torch.futures.Future()
>>> t = threading.Thread(
...     target=slow_set_future,
...     args=(fut, torch.ones(2) * 3)
... )
>>> t.start()
>>> print(fut.wait())
tensor([3., 3.])
>>> t.join()
然后(回调函数)[源代码]

将给定的回调函数附加到此 Future 对象,在 Future 完成时执行该回调。可以向同一个 Future 添加多个回调,但它们的执行顺序无法保证(要强制某种顺序,请考虑链接:fut.then(cb1).then(cb2))。回调函数必须接受一个参数,即此 Future 对象的引用。回调函数可以使用 value() 方法获取值。如果此 Future 已经完成,则给定的回调将立即内联执行。

如果 Future 的值包含位于 GPU 上的张量,回调可能在填充这些张量的异步内核尚未完成执行时被调用。然而,回调将使用一些专用流(从全局池中获取)与那些内核同步。因此,回调对这些张量进行的操作将在内核完成后安排在设备上执行。换句话说,只要回调不切换流,它就可以安全地操纵结果而无需额外的同步。这类似于 wait() 的非阻塞行为。

类似地,如果回调函数返回包含驻留在 GPU 上的张量的值,并且这些张量仍在设备上生成,那么即使相关内核还在运行,也可以这样做。前提是回调函数在执行过程中没有更改流。如果需要更改流,则必须小心重新同步到原始流,即那些在调用回调时当前正在使用的流。

参数

callback (Callable) – 一个以该 Future 为唯一参数的 Callable

返回值

一个新的Future对象,它持有callback的返回值,并在给定的callback完成后被标记为已完成。

返回类型

Future[S]

注意

注意,如果回调函数抛出异常(无论是通过原始的 future 因为异常而完成并调用 fut.wait() 引发,还是通过回调中的其他代码引发),由 then 返回的 future 将会适当地标记遇到的错误。然而,如果此回调后来完成了额外的 futures,这些 futures 不会被标记为由于错误而完成,并且用户需要独立地处理和等待这些 futures。

示例:
>>> def callback(fut):
...     print(f"RPC return value is {fut.wait()}.")
>>> fut = torch.futures.Future()
>>> # The inserted callback will print the return value when
>>> # receiving the response from "worker1"
>>> cb_fut = fut.then(callback)
>>> chain_cb_fut = cb_fut.then(
...     lambda x : print(f"Chained cb done. {x.wait()}")
... )
>>> fut.set_result(5)
RPC return value is 5.
Chained cb done. None
value()[源代码]

获取一个已完成后未来事件的值。

此方法应在调用wait()完成后或在传递给then()的回调函数内部被调用。否则,Future可能尚未持有值,并且调用value()可能会失败。

如果值包含位于 GPU 上的张量,则此方法不会执行任何额外的同步操作。建议在此之前通过调用wait()进行同步(除非在回调函数内部,此时已经由then()处理)。

返回值

Future 对象持有的值。如果在创建该值的过程中(例如通过回调或远程过程调用)发生了错误,value() 方法也会抛出相应的错误。

返回类型

wait()[源代码]

阻塞,直到该 Future 的值准备好。

如果值包含位于 GPU 上的张量,则会与在设备上执行的内核进行额外的异步同步操作。这种同步是非阻塞的,意味着 wait() 会在当前流中插入必要的指令,以确保后续在此流中排队的操作将在异步内核之后正确调度。即使那些内核仍在运行,wait() 也会立即返回。只要不更改流,在访问和使用这些值时不需要进一步的同步。

返回值

Future 对象持有的值。如果在创建该值的过程中(例如通过回调或远程过程调用)发生了错误,那么调用此 wait 方法时也会抛出相应的错误。

返回类型

torch.futures.collect_all(futures)[源代码]

将提供的Future对象收集到一个单一的组合Future中,当所有的子Future完成时,该组合Future也会完成。

参数

futures (列表) – 一个包含 Future 对象的列表。

返回值

将传入的 Futures 列表中的每个 Future 对象返回。

返回类型

Future[List[Future]]

示例:
>>> fut0 = torch.futures.Future()
>>> fut1 = torch.futures.Future()
>>> fut = torch.futures.collect_all([fut0, fut1])
>>> fut0.set_result(0)
>>> fut1.set_result(1)
>>> fut_list = fut.wait()
>>> print(f"fut0 result = {fut_list[0].wait()}")
fut0 result = 0
>>> print(f"fut1 result = {fut_list[1].wait()}")
fut1 result = 1
torch.futures.wait_all(futures)[源代码]

等待所有提供的 future 对象完成,并返回它们的结果列表。如果有任何一个 future 对象出现错误,该方法会提前结束并报告错误,而不是继续等待其他 future 对象完成。

参数

futures (列表) – 一个包含 Future 对象的列表。

返回值

已完成的Future结果列表。如果在等待任何Future时出现异常,该方法将会抛出错误。

返回类型

列表

本页目录