Python中的协程

Posted by FridayLi on May 26, 2018

本周组内分享我给大家分享了我对Python协程的一些理解, 梳理如下。

协程与生成器

只要 Python 函数的定义体中有 yield 关键字, 该函数就是生成器函数。 调用生成器函数时, 会返回一个生成器对象。生成器用来产出或生成值。 另外, 所有的生成器都是迭代器, 因为生成器完全实现了迭代器接口。

从语法上看, 协程与生成器类似, 都是定义体中包含 yield 关键字的函数。可是, 在协程中, yield 通常出现在表达式的右边, 如 a = yield. 我们复习一下生成器的知识。

1
2
3
4
5
6
7
8
def gen():
	value=0
	while True:
		receive=yield value
		 if receive=='e':
		     break
		value = 'got: %s' % receive
	return 'end'

对于上边的生成器函数 gen, 关键点在 receive=yield value 这一句的执行流程。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
In [4] g = gen()

In [5]: g.send(None)
Out[5]: 0

In [6]: g.send('abc')
Out[6]: 'got: abc'

In [7]: g.send('e')
-----------------------------------------------------------------------
StopIteration                         Traceback (most recent call last)
<ipython-input-7-422bc441ae72> in <module>()
----> 1 g.send('e')

StopIteration: ''end'

  • 第一次调用函数时, 会得到一个生成器 。
  • 利用next(g) 或 g.send(None) 可以激活生成器, 此时函数执行到yield 关键字处, 并把 value 的值 ( 0 ) 返回给调用方, 并且在 received = yield value 处停止。此时并没有执行给 value 赋值的操作。
  • 调用方继续执行g.send(‘abc’), 此时send 的值会传回生成器暂停的地方, 并把值赋给received, 函数继续执行, 循环后继续在 received = yield value 处停止, 把 value 的值返回, value = ‘got: abc’
  • 如果调用方发送 哨值 ‘e’, 此时生成器终止, 抛出 StopIteration 异常。而 gen 函数在结束时return的值 ’end’ 会绑定在 StopIteration 的value上。

yield from 语法

接下来我们看看从 Python 3.3 引入的 yield from 语法。引入 yield from 结构的主要原因之一与把异常传入嵌套的协程有关。另一个原因是上协程更方便的返回值。我们看个简单的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def subgen():
    print('start subgen')
	a = yield 1
	yield 2
	yield 3
	print('end subgen')
	return a

def gen():
    print('start gen')
	result = yield from subgen()
	print('result:', result)
	print('end gen')
	return 4

def main():
	g = gen()
	print(g.send(None))
	print(g.send('a'))
	print(g.send(None))
	try:
		g.send(None)
	except StopIteration as e:
		print(e)

main()

执行main( ) 函数后输出结果如下:

1
2
3
4
5
6
7
8
9
start gen
start subgen
1
2
3
end subgen
result a
end gen
4

可以根据输出结果看到整个的流程是怎么运作的, 我们可以看出, 在生成器 gen 中使用 yield from subgen() 时, subgen 会获得控制权, 把产出的值传给 gen的调用方main 函数, main 函数也可以直接把值 send 到 subgen, 即调用方可以直接控制subgen。与此同时, gen 会阻塞, 等待 subgen 终止, 即抛出 StopIteration 异常, 而抛出的异常会被 yield from 捕获, 并把value属性的值变成yield from 表达式的值赋值给result。

简单类比的话, 可以把 yield from iterator 等价于 for item in iterator: yield item

yield from 的主要功能是打开双向通道, 如下图所示(图中各个角色的代码与上边的不同), 把最外层的调用方与最内层的子生成器连接起来, 这样二者可以直接发送和产出值, 还可以直接传入异常。 我们可以把 subgen、gen、 main 三个函数的角色分别叫做:子生成器、委派生成器和调用方。

描述

一般, 我们写的代码里会有多个函数包含yield from, 且互相调用, 其实可以把他们看做中间委派生成器管道的一个个组成部分, 把他们串联起来组成了整个疏通调用方和子生成器的管道。

不管在一个协程中委派生成器的链条多么长, 链条的顶端肯定是通过g.send() 或 next(g) 这样的方法来驱动生成器的调用方, 链条的末端肯定是只包含 yield 关键字的简单生成器。

一个简单的协程框架

谈到协程框架我们会想到 tornado 和 asyncio, 但它们相关的代码并没有那么容易看懂, 我们可以通过分析一个简单的协程框架的实现来对协程框架的各个组成部分有个直观的认识。以下代码来自 Github, 我简单处理了下以便让整个流程看起来更清楚, 源码在这里.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import socket			# on top of TCP
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()


class Future:
    def __init__(self):
        self.callbacks = []

    def resolve(self):		# on future event callback
        for func in self.callbacks:
            func()


class Task:
    def __init__(self, gen, eventLoop):
        self.gen = gen
        self.step()

    def step(self):			# go to next step/next yield
        try:
            f = next(self.gen)
            f.callbacks.append(self.step)
        except StopIteration as e:
            # Task is finished
            eventLoop.n_task -= 1


class EventLoop:
    def __init__(self):
        self.n_task = 0

    def add_task(self, generator):
        self.n_task += 1
        Task(generator, self)

    def start(self):
        while self.n_task > 0:
            events = selector.select()
            for event, mask in events:
                f = event.data
                f.resolve()


def pause(s, event):
    f = Future()
    selector.register(s.fileno(), event, data=f)
    yield f		# pause this function


def async_get(path):
    s = socket.socket()
    s.setblocking(False)
    try:
        s.connect(('localhost', 3000))
    except BlockingIOError as e:
        pass
        print('socket {} connecting...'.format(s.fileno()))

    yield from pause(s, EVENT_WRITE)
    selector.unregister(s.fileno())

    request = 'GET %s HTTP/1.0\r\n\r\n' % path
    s.send(request.encode())

    total_received = []
    while True:
        yield from pause(s, EVENT_READ)
        selector.unregister(s.fileno())

        received = s.recv(1000)
        if received:
            print('{}received'.format(s.fileno()), received)
            total_received.append(received)
        else:
            body = (b''.join(total_received)).decode()
            print('--------------------------------------')
            print(body)
            print('--------------------------------------', 'Byte Received:', len(body), '\n\n')
            return


if __name__ == '__main__':
    start = time.time()
    eventLoop = EventLoop()

    for i in range(50):
        eventLoop.add_task(async_get('/super-slow'))

    eventLoop.start()

    print('%.1f sec' % (time.time() - start))

在main() 函数里, 我们会向本机的3000 端口发送5条(为了演示结果输出更简洁, 自己运行时可增加为500或更高)网络请求, 服务端的代码在index.js 里(代码在上边给出的github地址里), 我们可以通过 node index.js 来启动服务, 而每一个请求过来, 服务端会等待三秒后再发送返回数据。我们来运行下看看效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python3 coroutine.py

socket 4 connecting...
socket 5 connecting...
socket 6 connecting...
socket 7 connecting...
socket 8 connecting...

4received b'HTTP/1.1 200 OK\r\nX-Powered-By: Express\r\nDate: Sat, 26 May 2018 08:21:41 GMT\r\nConnection: close\r\n\r\nSuper Slow Response'
5received b'HTTP/1.1 200 OK\r\nX-Powered-By: Express\r\nDate: Sat, 26 May 2018 08:21:41 GMT\r\nConnection: close\r\n\r\nSuper Slow Response'
6received b'HTTP/1.1 200 OK\r\nX-Powered-By: Express\r\nDate: Sat, 26 May 2018 08:21:41 GMT\r\nConnection: close\r\n\r\nSuper Slow Response'
7received b'HTTP/1.1 200 OK\r\nX-Powered-By: Express\r\nDate: Sat, 26 May 2018 08:21:41 GMT\r\nConnection: close\r\n\r\nSuper Slow Response'
8received b'HTTP/1.1 200 OK\r\nX-Powered-By: Express\r\nDate: Sat, 26 May 2018 08:21:41 GMT\r\nConnection: close\r\n\r\nSuper Slow Response'


3.0 sec

我们看到发送五条请求耗时是3秒而不是3*5=15秒, 那是怎么做到的呢? 我们先看看各个函数对应的角色

  • Future : 封装回调函数, 在事件恢复时执行。
  • pause: —— subgen (类似aiohttp, asycio.sleep等底层库)
  • 业务逻辑(asyc_get): —— gen
  • Task: 封装 Future, 驱动协程 —— 调用方
  • EventLoop: 循环执行排定的任务,直到任务全部执行完。

协程, 其实归根结底是控制程序运行流程的技术。在这里网络请求和响应之所以没有阻塞, 主要是因为:

  1. socket 设置成了非阻塞状态
  2. 一个 socket 发送完请求后就把流程让出去, 并在 selectors 中注册这个 socket 的读事件。
  3. selector 循环查找可执行事件, 当服务端返回响应, socket 可读时, 取出这个注册的事件, 并且取出和事件绑定的 future, 执行注册到 future 的回调函数,这里future 回调函数是Task 的step 函数, 通过调用 next(f) 来驱动协程持续运作。
  4. 多个socket每次都是执行完不怎么耗时的操作后就把控制权让出去,如果接受的响应数据一次没接收完,会自动排到队尾等待下次轮到它时再继续接收(可以把 received = s.recv(1000) 改成 received = s.recv(10) 看看输出结果)。
  5. socket 接收完数据后打破循环,抛出 StopIteration, Task 数目减一。
  6. 所有 Task 都执行完后, 程序结束。

asyncio

最后,我们从上边的例子得到协程框架的运作方式之后, 来看一个 asyncio 的官方例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def compute(x, y):
	print("Compute %s + %s ..." % (x, y))
	await asyncio.sleep(1.0)
	return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

它的执行逻辑如下图所示:

描述

我们可以看到, print_sum 和 compute 函数都是委派生成器的一环, 底层调用的 asycio.sleep 函数可以看做是子生成器。而 EventLoop 和 Task 的作用与我们上边讲的相一致, EventLoop用来按序执行排定的任务, Task用来在事件触发时来直接驱动底层生成器继续往下执行,当底层生成器执行完后, 一层层往上抛出 StopIteration, 完成本次任务的执行。

这里的例子, 用到的不是网络 I/O, 而是 sleep 函数,用来设置休眠一段时间。它的事件触发和socket的事件触发不太一样, 可以从以下简化后的 EventLoop 函数中体会下两种不同的事件是如何排定执行顺序的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class EventLoop:
    def __init__(self):
        self.events_to_listen = []
        self.callbacks = {}
        self.timeout = None

    def register_event(self, event, callback):
        self.events_to_listen.append(event)
        self.callbacks[event] = callback

    def unregister_event(self, event):
        self.events_to_listen.remove(evenrt)
        del self.callbacks[event]

    def _process_events(self, events):
        for event in events:
            self.callbacks[event](event)
    
    def call_later(self, delay, callback):
	    self.call_at(now() + delay, callback)

	def call_at(self, when, callback):
	    self.timeout_callbacks[when] = callback

	def start_loop(self):
	    while True:
	        timeout = min(self.timeout_callbacks.keys()) - now()
	        events_happend = poll_events(self.events_to_listen, timeout)
	        if not empty(events_happend):
	            self._process_events(events_happend)
	        self._process_timeout_events()
	
	def _process_timeout_events(self):
	    time_now = now()
	    for time, callback in self.timeout_callbacks.iteritems():
	        if time < time_now:
	            callback()
	            del self.timeout_callbacks[time]

这里 poll_events 之前,会去计算所有计时器事件最少需要等待的时间,这个时间内即使没有事件发生,poll_events 也会退出,以便触发计时器事件。 _process_timeout_events 函数的作用是对比当前时间与计时器的目标执行时间,如果目标执行时间已经到达,则执行相应的回调函数。

参考文章

asyncio 执行逻辑 协程框架源码 生成器和yield from Pycon 协程PPT Pycon 协程Youtube