《Fluent Python》笔记 协程

生成器作为协程

协程是指一个过程, 这个过程与调用方协作, 产出由调用方提供的值。

协程使用的简单演示(用作协程的生成器):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> def simple_coroutine(): # 生成器函数
... print('-> coroutine started')
... x = yield # yield表达式右边为协程产出的值,没有默认为None
... print('-> coroutine received:', x)
...
>>> my_coro = simple_coroutine() # 得到生成器对象
>>> my_coro
<generator object simple_coroutine at 0x100c2be10>
>>> next(my_coro) # 启动生成器,到第一个yield处暂停
-> coroutine started
>>> my_coro.send(42) # 向协程中发送数据,协程定义体中yield表达式会计算出42
-> coroutine received: 42
Traceback (most recent call last): #
...
StopIteration

协程有四个状态,协程当前的状态可以使用inspect.getgeneratorstate() 函数确定, 该函数会返回下述字符串中的一个。
**’GEN_CREATED’**:等待开始执行。
**’GEN_RUNNING’**:解释器正在执行。
**’GEN_SUSPENDED’**:在 yield 表达式处暂停。
**’GEN_CLOSED’**:执行结束。

因为 send 方法的参数会成为暂停的 yield 表达式的值, 所以, 仅当协程处于暂停状态(’GEN_SUSPENDED’)时才能调用 send 方法,否则会报错。

生成器实例化后得到的协程my_coro处于’GEN_CREATED’状态,通过next(my_coro) (也可以调用my_coro.send(None),效果相同)激活协程变为’GEN_RUNNING’状态(第一次次激活叫做预激)。运行到yield表达式变为’GEN_SUSPENDED’状态,协程定义体执行结束变为’GEN_CLOSED’状态。

通过装饰器预激协程

因为预激是使用协程的关键步骤,为了简化协程的用法,有时会使用一个预激装饰器,这样可以避免忘记预激操作。预激装饰器示例:

1
2
3
4
5
6
7
8
9
10
from functools import wraps

def coroutine(func):
"""装饰器:向前执行到第一个`yield`表达式, 预激`func`"""
@wraps(func)
def primer(*args,**kwargs): # 将被装饰的生成器函数替换成函数primer,返回预激后的生成器
gen = func(*args,**kwargs) # 获取生成器对象
next(gen) # 预激
return gen # 返回预激后的生成器
return primer

终止协程和异常处理

未处理的异常会导致协程终止。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class DemoException(Exception):
"""为这次演示定义的异常类型。 """

def demo_exc_handling():
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print('-> coroutine received: {!r}'.format(x))
finally:
pass # 在协程终止时执行的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> exc_coro = demo_exc_handling() # 获取作为协程的生成器对象
>>> next(exc_coro) # 预激协程
-> coroutine started
>>> exc_coro.send(11) # 发送数据给协程,通过send函数发送给协程的值会被yield表达式接收
-> coroutine received: 11
>>> exc_coro.send(22)
-> coroutine received: 22
>>> from inspect import getgeneratorstate
>>> exc_coro.throw(DemoException)
*** DemoException handled. Continuing...
>>> getgeneratorstate(exc_coro)
'GEN_SUSPENDED'
>>> exc_coro.close()
>>> getgeneratorstate(exc_coro)
'GEN_CLOSED

重要方法:

  • generator.send(...)

    如果发送给协程的值(通常可以使用内置的 NoneEllipsis)在协程定义体中参与运算抛出异常,且这个抛出的异常未处理,那么就会导致协程终止。

  • generator.throw(exc_type[, exc_value[, traceback]])

    使协程在暂停的 yield 表达式处抛出指定的异常(exc_type)。 如果协程处理了抛出的异常, 代码会向前执行到下一个 yield 表达式, 而产出的值会成为调用 generator.throw方法得到的返回值。 如果协程没有处理抛出的异常, 异常会向上冒泡, 传到调用方的上下文中。并且协程也会终止。

  • generator.close()

    致使协程在暂停的 yield 表达式处抛出 GeneratorExit 异常。如果协程没有处理这个异常, 或者抛出了 StopIteration 异常(通常是指运行到结尾),调用方不会报错(此时协程终止退出)。 如果收到 GeneratorExit 异常, 协程一定不能产出值, 否则解释器会抛出 RuntimeError 异常。协程抛出的其他异常会向上冒泡, 传给调用方。并且协程也会终止。

虽然上面所说的协程其实本质上是生成器对象,此时生成器对象的行为体现了协程。

yield from在协程中的运用

yield from的主要功能是打开双向通道, 把最外层的调用方与最内层的子生成器连接起来, 这样二者可以直接发送和产出值, 还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。 有了这个结构, 协程可以通过以前不可能的方式委托职责。

image-20221214165543797

  • 委派生成器
    包含 yield from <iterable> 表达式的生成器函数。

  • 子生成器
    yield from 表达式中 <iterable> 部分获取的生成器。

该结构运行流程如下:

委派生成器在 yield from 表达式处暂停时, 调用方可以直接把数据发给子生成器, 子生成器再把产出的值发给调用方。 子生成器返回之后, 解释器会抛出 StopIteration 异常, 并把返回值附加到异常对象上, 此时委派生成器会恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from collections import namedtuple

Result = namedtuple('Result', 'count average')

# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total/count
return Result(count, average) # 生成器的返回值

# 委派生成器
def grouper(results, key):
while True:
results[key] = yield from averager()

# 客户端代码,即调用方
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group) # 预激协程
for value in values:
group.send(value)
group.send(None) # 终止协程
report(results)

# 输出报告
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))

data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

if __name__ == '__main__':
main(data)
1
2
3
4
9 boys averaging 40.42kg
9 boys averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m

grouper 发送的每个值都会经由 yield from 处理, 通过管道传给averager 实例。 grouper 会在 yield from 表达式处暂停, 等待averager 实例处理客户端发来的值。 averager 实例运行完毕后, 返回的值绑定到 results[key] 上。