PEP 0492 Coroutines with async and await syntax 中文翻译

原文地址: PEP-0492

PEP492
标题协程与 async/await 语法
作者Yury Selivanov <yury at magic.io>
翻译ipfans <ipfanscn at gmail.com>
状态最终稿
Python 版本3.5
翻译最后更新2015-11-03

目录

摘要
API 设计和实现的备注
基本原理和目标
语法规范

专用术语表
函数与方法列表
移植计划

设计思路 (暂时不考虑翻译)
性能

实现参考

参考

致谢

版权信息

摘要

不断增长的网络连通性需求带动了对响应性、伸缩性代码的需求。这个 PEP 的目标在于回答如何更简单的、Pythinic 的实现显式的异步 / 并发的 Python 代码。

我们把协程概念独立出来,并为其使用新的语法。最终目标是建立一个通用、易学的 Python 异步编程模型,并尽量与同步编程的风格保持一致。

这个 PEP 假设异步任务被一个事件循环器(类似于标准库里的 asyncio.events.AbstractEventLoop)管理和调度。不过,我们并不会依赖某个事件循环器的具体实现方法,从本质上说只与此相关:使用 yield 作为给调度器的信号,表示协程将会挂起直到一个异步事件(如 IO)完成。

我们相信这些改变将会使 Python 在这个异步编程快速增长的领域能够保持一定的竞争性,就像许多其它编程语言已经、将要进行的改变那样。

API 设计和实现的备注

根据 Python 3.5 Beta 期间的反馈,我们进行了重新设计:明确的把协程从生成器里独立出来 — 原生协程现在拥有了自己完整的独立类型,而不再是一种新的生成器类型。

这个改变主要是为了解决在 Tornado Web 服务中里集成协程时出现的一些问题。

基本原理和目标

现在版本的 Python 支持使用生成器实现协程功能 (PEP-342),后面通过 PEP-380 引入了 yield from 语法进行了增强。但是这样仍有一些缺点:

  • 协程与常规的生成器在相同语法时用以混淆,尤其是对心开发者而言。
  • 一个函数是否是协程需要通过是否主体代码中使用了 yield 或者 yield from 语句进行检测,这样在重构代码中添加、去除过程中容易出现不明显的错误
  • 异步调用的支持被 yield 支持的语法先定了,导致我们无法使用更多的语法特性,比如 withfor 语句。

这个提议的目的是将协程作为原生 Python 语言特性,并且将他们与生成器明确的区分开。它避免了生成器 / 协程中间的混淆请困高,方便编写出不依赖于特定库的协程代码。这个也方便 linter 和 IDE 能够实现更好的进行静态代码分析和重构。

原生协程和相关的新语法特性使得可以在异步框架下可以定义一个上下文管理器和迭代协议。在这个提议后续中,新的 async with 语法让 Python 程序在进入和离开运行上下文时实现异步调用,新的 async for 语法可以在迭代器中实现异步调用。

语法规范

这个提议介绍了新的语法用于增强 Python 中的协程支持。

这个语法规范假设你已经了解 Python 现有协程实现方法 (PEP-342PEP-380)。这次语法改变的动机来自于 asyncio 框架 (PEP-3156) 和 Cofunctions 提议 (PEP-3152,现在此提议已被废弃)。

从本文档中,我们使用 原生协程 代指新语法生命的函数,基于生成器的协程 用于表示那些基于生成器语法实现的协程。协程 则表示两个地方都可以使用的内容。

新协程声明语法

下面的新语法用于声明原生协程:

async def read_data(db):
    pass

协程的主要属性包括:

  • async def 函数始终为协程,即使它不包含 await 表达式。
  • 如果在 async 函数中使用 yield 或者 yield from 表达式会产生 SyntaxError 错误。
  • 在内部,引入了两个新的代码对象标记:
    • CO_COROUTINE 用于标记原生协程(和新语法一起定义)
    • CO_ITERABLE_COROUTINE 用于标记基于生成器的协程,兼容原生协程。(通过 types.coroutine() 函数设置)
  • 常规生成器在调用时会返回一个 genertor 对象,同理,协程在调用时会返回一个 coroutine 对象。
  • 协程不再抛出 StopIteration 异常,而是替代为 RuntimeError。常规生成器实现类似的行为需要进行引入 __future__(PEP-3156)
  • 当协程进行垃圾回收时,一个从未被 await 的协程会抛出 RuntimeWarning 异常。(参考 调试特性)
  • 更多内容请参考 协程对象 一节。

types.coroutine()

types 模块中新添加了一个函数 coroutine(fn) 用于 asyncio 中基于生成器的协程与本 PEP 中引入的原生携协程互通。

@types.coroutine
def process_data(db):
    data = yield from read_data(db)
    ...

这个函数将生成器函数对象设置 CO_ITERABLE_COROUTINE 标记,将返回对象变为 coroutine 对象。

如果 fn 不是一个生成器函数,那么它会对其进行封装。如果它返回一个生成器,那么它会封装一个 awaitable 代理对象 (参考下面 awaitable 对象的定义)。

注意:CO_COROUTINE 标记不能通过 types.coroutine() 进行设置,这就可以将新语法定义的原生协程与基于生成器的协程进行区分。

types 模块添加了一个新函数 coroutine(fn),使用它,“生成器实现的协程” 和 “原生协程” 之间可以进行互操作。

Await 表达式

下面新的 await 表达式用于获取协程执行结果:

async def read_data(db):
    data = await db.fetch('SELECT ...')
    ...

awaityield from 相似,挂起 read_data 协程的执行直到 db.fetch 这个 awaitable 对象完成并返回结果数据。

它复用了 yield from 的实现,并且添加了额外的验证参数。await 只接受以下之一的 awaitable 对象:

  • 一个原生协程函数返回的原生协程对象。
  • 一个使用 types.coroutine() 修饰器的函数返回的基于生成器的协程对象。
  • 一个包含返回迭代器的 __await__ 方法的对象。
    任意一个 yield from 链都会以一个 yield 结束,这是 Future 实现的基本机制。因此,协程在内部中是一种特殊的生成器。每个 await 最终会被 await 调用链条上的某个 yield 语句挂起(参考 PEP-3156 中的进一步解释)。
    为了启用协程的这一特点,一个新的魔术方法 __await__ 被添加进来。在 asyncio 中,对于对象在 await 语句启用 Future 对象只需要添加 __await__ = __iter__ 这行到 asyncio.Future 类中。
    在本 PEP 中,带有 __await__ 方法的对象也叫做 Future-like 对象。 同样的,请注意到 __aiter__ 方法(下面会定义)不能用于这种目的。它是不同的协议,有点类似于用 __iter__ 替代普通调用方法的 __call___
    如果 __await__ 返回非迭代器类型数据,会产生一个 TypeError.
  • CPython C API 中使用 tp_as_async.am_await 定义的函数,并且返回一个迭代器(类似 __await__ 方法)。

新的操作符优先级列表

关键词 awaityieldyield form 操作符的区别是 await 表达式大部分情况下不需要括号包裹。

同样的,yield from 允许允许任意表达式做其参数,包含表达式如 yield a()+b(),这样通常处理作为 yield from (a()+b()),这个通常会造成 Bug。通常情况下任意算数操作的结果都不会是 awaitable 对象。为了避免这种情况,我们将 await 的优先级调整为低于 [], () 和.,但是高于 ** 操作符。

操作符描述
yield x , yield from xYield 表达式
lambdaLambda 表达式
if – else条件表达式
or布尔或
and布尔与
not x布尔非
in , not in , is , is not , <, <= ,> , >= , != , ==比较,包含成员测试和类型测试
|字节或
^字节异或
&字节与
«,»移位
+ , -加和减
* , @ , / , // , %乘,矩阵乘法,除,取余
+x , -x , ~x正数, 复数, 取反
**平方
await xAwait 表达式
x[index] , x[index:index] , x(arguments…) , x.attribute子集,切片,调用,属性
(expressions…) , [expressions…] , {key: value…} , {expressions…}类型显示

await 表达式示例

有效的语法例子:

表达式会被处理为
if await fut: passif (await fut): pass
if await fut + 1: passif (await fut) + 1: pass
pair = await fut, ‘spam’pair = (await fut), ‘spam’
with await fut, open(): passwith (await fut), open(): pass
await foo()[‘spam’].baz()()await (foo()[‘spam’].baz()() )
return await coro()return ( await coro() )
res = await coro() ** 2res = (await coro()) ** 2
func(a1=await coro(), a2=0)func(a1=(await coro()), a2=0)
await foo() + await bar()(await foo()) + (await bar())
-await foo()-(await foo())

错误的语法例子:

表达式应写作
await await coro()await (await coro())
await -coro()await (-coro())

异步上下文管理与 async with

一个异步上下文管理器是用于在 enterexit 方法中管理暂停执行的上下文管理器。

为此,我们设置了新的异步上下文管理器。添加了两个魔术方法: __aenter____aexit__。这两个方法都返回 awaitable 对象。

异步上下文管理器例子如下:

class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await log('exiting context')

新语法

一个新的异步上下文管理语法被接受:

async with EXPR as VAR:
    BLOCK

语义上等同于:

mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)
exc = True

VAR = await aenter
try:
    BLOCK
except:
    if not await aexit(mgr, *sys.exc_info()):
        raise
else:
    await aexit(mgr, None, None, None)

和普通的 with 语句一样,可以在单个 async with 语句里指定多个上下文管理器。

在使用 async with 时,如果上下文管理器没有 __aenter____aexit__ 方法,则会引发错误。在 async def 函数之外使用 async with 则会引发 SyntaxError 异常。

例子

通过异步上下文管理器更容易实现协程对数据库事务的正确管理:

async def commit(session, data):
    ...

    async with session.transaction():
        ...
        await session.update(data)
        ...

代码看起来也更加简单:

async with lock:
    ...

而不是

with (yield from lock):
    ...

异步迭代器与 async for

一个异步迭代器能够在它的迭代实现里调用异步代码,也可以在它的 __next__ 方法里调用异步代码。为了支持异步迭代,需要:

  1. 一个对象必须实现 __aiter__ 方法(或者,使用 CPython C API 的 tp_as_async.am_aiter 定义),返回一个异步迭代器对象中的 ```awaitable```` 结果。
  2. 一个异步迭代器必须实现 __anext__ 方法(或者,使用 CPython C API 的 tp_as_async.am_anext 定义)返回一个 awaitable
  3. 停止迭代器的 __anext__ 必须抛出一个 StopAsyncIteration 异常。

一个异步迭代的例子:

class AsyncIterable:
    async def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

    async def fetch_data(self):
        ...

新语法

一种新的异步迭代方案被采纳:

async for TARGET in ITER:
    BLOCK
else:
    BLOCK2

语义上等同于:

iter = (ITER)
iter = await type(iter).__aiter__(iter)
running = True
while running:
    try:
        TARGET = await type(iter).__anext__(iter)
    except StopAsyncIteration:
        running = False
    else:
        BLOCK
else:
    BLOCK2

如果对一个普通的不含有 __aiter__ 方法的迭代器使用 async for,会引发 TypeError 异常。如果在 async def 函数外使用 async for 会已发 SyntaxError 异常。

和普通的 for 语法一样,async for 有可选的 else 分支。

例子 1

通过异步迭代器,就可以实现通过迭代实现异步缓冲数据:

async for data in cursor:
    ...

cursor 是一个异步迭代器时,就可以在 N 次迭代后从数据库中预取 N 行数据。

下面的代码演示了新的异步迭代协议:

class Cursor:
    def __init__(self):
        self.buffer = collections.deque()

    async def _prefetch(self):
        ...

    async def __aiter__(self):
        return self

    async def __anext__(self):
        if not self.buffer:
            self.buffer = await self._prefetch()
            if not self.buffer:
                raise StopAsyncIteration
        return self.buffer.popleft()

那么这个 Cursor 类可以按照下面的方式使用:

async for row in Cursor():
    print(row)

这个等同于下面的代码:

i = await Cursor().__aiter__()
while True:
    try:
        row = await i.__anext__()
    except StopAsyncIteration:
        break
    else:
        print(row)

例子 2

下面的工具类用于将普通的迭代转换为异步。这个并没有什么实际的作用,这个代码只是用于演示普通迭代与异步迭代之间的关系。

class AsyncIteratorWrapper:
    def __init__(self, obj):
        self._it = iter(obj)

    async def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            value = next(self._it)
        except StopIteration:
            raise StopAsyncIteration
        return value

async for letter in AsyncIteratorWrapper("abc"):
    print(letter)

为什么使用 StopAsyncIteration

协程在内部实现中依旧是依赖于迭代器的。因此,在 PEP-479 生效之前,下面两者并没有区别:

def g1():
    yield from fut
    return 'spam'
and

def g2():
    yield from fut
    raise StopIteration('spam')

但是在 PEP 479 接受并且默认对协程开启时,下面的例子中的 StopIteration 会被封装成 RuntimeError

async def a1():
    await fut
    raise StopIteration('spam')

所以,想通知外部代码迭代已经结束,抛出一个 StopIteration 异常的是不行的。因此,一个新的内置异常类 StopAsyncIteration 被引入进来了。

另外,根据 PEP 479,所有协程中抛出的 StopIteration 异常都会被封装成 RuntimeError

协程对象

与生成器的不同之处

这节进适用于 CO_COROUTINE 标记的原生协程,即,使用 async def 语法定义的对象。

现有的 asyncio 库中的 * 基于生成器的协程 * 的行为未做变更。

为了将协程与生成器区别开来,定义了下面的概念:

  1. 原生协程对象不实现 __iter____next__ 方法。因此,他们不能够通过 iter(),list(),tuple() 和其他一些内置函数进行迭代。他们也不能用于 for...in 循环。
    在原生协程中尝试使用 __iter__ 或者 __next 会触发 TypeError 异常。
  2. 未被装饰的生成器不能够 yield from 一个原生协程:这样会引发 TypeError
  3. 基于生成器的协程 (asyncio 代码必须使用 @asyncio.coroutine) 可以 yield from 一个原生协程。
  4. 对原生协程对象和原生协程函数调用 inspect.isgenerator()inspect.isgeneratorfunction() 会返回 False。

协程对象方法

协程内部基于生成器,因此他们同享实现过程。类似于生成器对象,协程包含 throw()send()close() 方法。StopIterationGeneratorExit 在协程中扮演者同样的角色(尽管 PEP 479 默认对协程开启了)。参考 PEP-342, PEP-380 和 Python 文档了解更多细节。

协程的 throw()send() 方法可以用于将返回值和抛出异常推送到类似于 Future 的对象中。

调试特性

一个初学者普遍会犯的错误是忘记在协程中使用 yield from

@asyncio.coroutine
def useful():
    asyncio.sleep(1) # this will do noting without 'yield from'

为了调试这类错误,asycio 提供了一种特殊的调试模式:装饰器 @coroutine 封装所有的函数成一个特殊对象,这个对象的析构函数中记录警告。当封装的生成器垃圾回收时,会产生详细的记录信息,包括具体定义修饰函数、回收时的栈信息等等。封装对象同样提供一个 __repr__ 函数用于输出关于生成器的详细信息。

唯一的问题是如何启用这些调试功能。这些调试工具在生产模式中什么都不做,@coroutine 修饰符在系统变量 PYTHONASYNCIODEBUG 设置后才会提供调试功能。这种方式可以让 asyncio 程序使用 asyncio 自己的函数分析。EventLoop.set_debug 是另外一个调试工具,他不会影响 @coroutine 修饰符行为。

根据本提议,协程是原生的与生成器不同的概念。当抛出 RuntimeWarning 异常的协程是从来没有被 awaited 过的。因此添加了两条新的函数到 sys 模块:set_coroutine_wrapperget_coroutine_wrapper。这个用于开启 asyncio 或者其他框架中的高级调试 (比如显示协程创建的位置和垃圾回收时的栈信息)。

新的标准库函数

  • types.coroutine(gen)。参考 types.coroutine() 节中的内容。
  • inspect.iscoroutine(obj) 当 obj 是原生协程时返回 True。
  • inspect.iscoroutinefunction(obj) 当 obj 是原生协程函数时返回为 True。
  • inspect.isawaitable(obj) 当 obj 是 awaitable 时返回为 True。
  • inspect.getcoroutinestate(coro) 返回原生协程对象的当前状态(是 inspect.getfgeneratorstate(gen) 的镜像)。
  • inspect.getcoroutinelocals(coro) 返回原生协程对象的局部变量的映射(是 inspect.getgeneratorlocals(gen) 的镜像)。
  • sys.set_coroutine_wrapper(wrapper) 允许拦截原生协程对象的创建。wrapper 必须是一个接受一个参数 callable(一个协程对象),或者是 NoneNone 会重置 wrapper。当调用第二次时,新的 wrapper 会替代之前的封装。这个函数是线程专有的。参考 调度调试 了解更多细节。
  • sys.get_coroutine_wrapper() 返回当前的封装对象。如果封装未设置会返回 None。这个函数是线程专有的。参考 调度调试 了解更多细节。

新的抽象基类

为了允许更好的与现有的框架(比如 Tornado)和编译器(比如 Cython)整合,我们添加了两个新的抽象基类 (ABC):
collections.abc.AwaitableFuture-like 类的抽象基类,它实现了 __await__ 方法。
collections.abc.Coroutine 是协程对象的抽象基类,它实现了 send(value)throw(type, exc, tb)close()__await__() 方法。

值得注意的是,带有 CO_ITERABLE_COROUTINE 标记的基于生成器的协程并没有实现 __await__ 方法,因此他不是 collections.abc.Coroutinecollections.abc.Awaitable 抽象类的实例:

@types.coroutine
def gencoro():
    yield

assert not isinstance(gencoro(), collections.abc.Coroutine)

# 然而:
assert inspect.isawaitable(gencoro())

为了方便对异步迭代的调试,添加了另外两个抽象基类:

  • collections.abc.AsyncIterable – 用于测试 __aiter__ 方法
  • collections.abc.AsyncIterator – 用于测试 __aiter____anext__ 方法。

专用术语表

函数与方法列表

移植计划

向后兼容性

asyncio

asyncio 移植策略

CPython 代码中的 async/await

语法更新

失效计划

性能

总体影响

这个提议并不会造成性能影响。这是 Python 官方性能测试结果:

python perf.py -r -b default ../cpython/python.exe ../cpython-aw/python.exe

[skipped]

Report on Darwin ysmac 14.3.0 Darwin Kernel Version 14.3.0:
Mon Mar 23 11:59:05 PDT 2015; root:xnu-2782.20.48~5/RELEASE_X86_64
x86_64 i386

Total CPU cores: 8

### etree_iterparse ###
Min: 0.365359 -> 0.349168: 1.05x faster
Avg: 0.396924 -> 0.379735: 1.05x faster
Significant (t=9.71)
Stddev: 0.01225 -> 0.01277: 1.0423x larger

The following not significant results are hidden, use -v to show them:
django_v2, 2to3, etree_generate, etree_parse, etree_process, fastpickle,
fastunpickle, json_dump_v2, json_load, nbody, regex_v8, tornado_http.

编译器修改

修改后的编译器处理 Python 文件没有明显的性能下降:处理 12MB 大小的文件(Lib/test/test_binop.py 重复 1000 次)消耗时间相同。

async/await

下面的小测试用于检测『async』函数和生成器的性能差异:

import sys
import time

def binary(n):
    if n <= 0:
        return 1
    l = yield from binary(n - 1)
    r = yield from binary(n - 1)
    return l + 1 + r

async def abinary(n):
    if n <= 0:
        return 1
    l = await abinary(n - 1)
    r = await abinary(n - 1)
    return l + 1 + r

def timeit(func, depth, repeat):
    t0 = time.time()
    for _ in range(repeat):
        o = func(depth)
        try:
            while True:
                o.send(None)
        except StopIteration:
            pass
    t1 = time.time()
    print('{}({}) * {}: total {:.3f}s'.format(func.__name__, depth, repeat, t1-t0))

结果显示并没有明显的性能差异:

binary(19) * 30: total 53.321s
abinary(19) * 30: total 55.073s

binary(19) * 30: total 53.361s
abinary(19) * 30: total 51.360s

binary(19) * 30: total 49.438s
abinary(19) * 30: total 51.047s

注意:19 层意味着 1,048,575 调用。

实现参考

实现参考可以在 这里 找到。

上层修改和新协议列表

  1. 新的协程定义语法:async def 和新的 await 关键字。
  2. Future-like 对象提供新的 __await__ 方法和新的 PyTypeObject tp_as_async.am_await
  3. 新的异步上下文管理器语法: async with,协议提供了 __aenter____aexit__ 方法。
  4. 新的异步迭代语法:async for,协议提供了 __aiter__aexit 和新的内置异常 StopAsyncIterationPyTypeObject 提供了新的 tp_as_async.am_aitertp_as_async.am_anext
  5. 新的 AST 节点:AsyncFunctionDefAsyncForAsyncWithAwait
  6. 新函数 sys.set_coroutine_wrapper(callback)sys.get_coroutine_wrapper()types.coroutine(gen)inspect.iscoroutinefunction(func)inspect.iscoroutine(obj)inspect.isawaitable(obj)inspect.getcoroutinestate(coro)inspect.getcoroutinelocals(coro)
  7. 新的代码对象标记 CO_COROUTINECO_ITERABLE_COROUTINE
  8. 新的抽象基类 collections.abc.Awaitablecollections.abc.Coroutinecollections.abc.AsyncIterablecollections.abc.AsyncIterator
  9. C API 变更:新的 PyCoro_Type(将 Python 作为 types.CoroutineType 输出)和 PyCoroObjectPyCoro_CheckExact(*o) 用于检测 o 是否为原生协程。

虽然变化和新内容列表并不短,但是重要的是理解:大部分用户不会直接使用这些特性。他的目的是在于框架和库能够使用这些为用户提供便捷的使用和明确的 API 用于 async defawaitasync forasync with 语法。

可以工作的实例

本 PEP 提出的所有概念都 已经实现,并且可以被测试。

import asyncio

async def echo_server():
    print('Serving on localhost:8000')
    await asyncio.start_server(handle_connection,'localhost', 8000)

async def handle_connection(reader, writer):
    print('New connection...')

    while True:
        data = await reader.read(8192)

        if not data:
            break

        print('Sending {:.10}... back'.format(repr(data)))
        writer.write(data)

loop = asyncio.get_event_loop()
loop.run_until_complete(echo_server())
try:
    loop.run_forever()
finally:
    loop.close()
Built with Hugo
主题 StackJimmy 设计