torch.futures
此包提供了一个 Future
类型,用于封装异步执行,并提供了一系列实用函数来简化对 Future
对象的操作。目前,Future
类型主要被 分布式 RPC 框架 使用。
- 类torch.futures.Future(*, devices=None)
-
这是一个围绕
torch._C.Future
的封装,用于包含可调用对象的异步执行,例如rpc_async()
。此外,它还提供了一组API来添加回调函数和设置结果。警告
GPU支持是一项Beta功能,可能随时发生变化。
- add_done_callback(callback)[源代码]
-
将给定的回调函数附加到此
Future
,当Future
完成时会触发该回调。可以向同一个Future
添加多个回调,但它们执行的顺序无法保证。回调必须接受一个参数,即此Future
的引用。回调函数可以使用value()
方法获取值。需要注意的是,如果此Future
已经完成,则给定的回调将立即执行。我们建议你使用
then()
方法,因为它提供了一种在回调完成后进行同步的方式。如果你的回调不返回任何内容,则add_done_callback
可能更便宜。但是,无论是then()
还是add_done_callback
,它们在底层都使用相同的回调注册 API。对于 GPU 张量来说,该方法的行为与
then()
相同。- 参数
-
callback (
Future
) – 一个带有参数的Callable
,该参数是此Future
对象的引用。
注意
注意,如果回调函数抛出异常(无论是通过原始 future 对象完成时带有异常并调用
fut.wait()
,还是通过回调中的其他代码),必须仔细进行错误处理。例如,如果此回调稍后完成了额外的 futures,这些 futures 不会被标记为已完成的状态,并且用户需要独立地处理和等待这些 futures。- 示例:
-
>>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
- done()[源代码]
-
如果此
Future
已完成,则返回True
。当Future
具有结果或异常时,它就被认为是已完成的。如果值包含位于 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,
Future.done()
仍将返回True
。因为在该阶段结果已经是可用的,只要执行适当的同步操作(参见wait()
)。- 返回类型
- set_exception(result)[源代码]
-
为此
Future
设置一个异常,将其标记为已完成并带有错误状态,并触发所有附加的回调。需要注意的是,在调用此Future
的 wait()/value() 方法时,此处设置的异常将会被内联抛出。- 参数
-
result (BaseException) - 此
Future
对象的异常。
- 示例:
-
>>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
- set_result(result)[源代码]
-
设置此
Future
的结果,将其标记为已完成,并触发所有附加的回调。需要注意的是,Future
不能被标记两次为已完成。如果结果包含位于 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,只要在此方法调用时将那些内核入队的流设置为当前流,则仍可以调用此方法。简单来说,在启动这些内核后立即调用此方法是安全的,并且无需任何额外同步,只要在这期间不更改流即可。此方法将在所有相关的当前流上记录事件,并使用它们来确保
Future
的所有消费者都能得到正确的调度。- 参数
-
result (对象) – 此
Future
的结果。
- 示例:
-
>>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
- 然后(回调函数)[源代码]
-
将给定的回调函数附加到此
Future
对象,在Future
完成时执行该回调。可以向同一个Future
添加多个回调,但它们的执行顺序无法保证(要强制某种顺序,请考虑链接:fut.then(cb1).then(cb2)
)。回调函数必须接受一个参数,即此Future
对象的引用。回调函数可以使用value()
方法获取值。如果此Future
已经完成,则给定的回调将立即内联执行。如果 Future 的值包含位于 GPU 上的张量,回调可能在填充这些张量的异步内核尚未完成执行时被调用。然而,回调将使用一些专用流(从全局池中获取)与那些内核同步。因此,回调对这些张量进行的操作将在内核完成后安排在设备上执行。换句话说,只要回调不切换流,它就可以安全地操纵结果而无需额外的同步。这类似于
wait()
的非阻塞行为。类似地,如果回调函数返回包含驻留在 GPU 上的张量的值,并且这些张量仍在设备上生成,那么即使相关内核还在运行,也可以这样做。前提是回调函数在执行过程中没有更改流。如果需要更改流,则必须小心重新同步到原始流,即那些在调用回调时当前正在使用的流。
- 参数
-
callback (
Callable
) – 一个以该Future
为唯一参数的Callable
。 - 返回值
-
一个新的
Future
对象,它持有callback
的返回值,并在给定的callback
完成后被标记为已完成。 - 返回类型
-
Future[S]
注意
注意,如果回调函数抛出异常(无论是通过原始的 future 因为异常而完成并调用
fut.wait()
引发,还是通过回调中的其他代码引发),由then
返回的 future 将会适当地标记遇到的错误。然而,如果此回调后来完成了额外的 futures,这些 futures 不会被标记为由于错误而完成,并且用户需要独立地处理和等待这些 futures。- 示例:
-
>>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
- value()[源代码]
-
获取一个已完成后未来事件的值。
此方法应在调用
wait()
完成后或在传递给then()
的回调函数内部被调用。否则,Future
可能尚未持有值,并且调用value()
可能会失败。如果值包含位于 GPU 上的张量,则此方法不会执行任何额外的同步操作。建议在此之前通过调用
wait()
进行同步(除非在回调函数内部,此时已经由then()
处理)。- 返回值
-
此
Future
对象持有的值。如果在创建该值的过程中(例如通过回调或远程过程调用)发生了错误,value()
方法也会抛出相应的错误。 - 返回类型
-
- wait()[源代码]
-
阻塞,直到该
Future
的值准备好。如果值包含位于 GPU 上的张量,则会与在设备上执行的内核进行额外的异步同步操作。这种同步是非阻塞的,意味着
wait()
会在当前流中插入必要的指令,以确保后续在此流中排队的操作将在异步内核之后正确调度。即使那些内核仍在运行,wait()
也会立即返回。只要不更改流,在访问和使用这些值时不需要进一步的同步。- 返回值
-
此
Future
对象持有的值。如果在创建该值的过程中(例如通过回调或远程过程调用)发生了错误,那么调用此wait
方法时也会抛出相应的错误。 - 返回类型
-
- torch.futures.collect_all(futures)[源代码]
-
将提供的
Future
对象收集到一个单一的组合Future
中,当所有的子Future
完成时,该组合Future
也会完成。- 示例:
-
>>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1