python编程进阶(12):协程与异步IO

协程

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。

子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

1
2
3
4
5
6
7
8
9
def A():
print '1'
print '2'
print '3'

def B():
print 'x'
print 'y'
print 'z'

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1
2
3
4
5
6
1
2
x
y
3
z

多线程比,协程有何优势?

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。Python对协程的支持是通过generator实现的。

一个例子:生产者-消费者的协程

现在我们要让生产者发送1,2,3,4,5给消费者,消费者接受数字,返回状态给生产者,而我们的消费者只需要3,4,5就行了,当数字等于3时,会返回一个错误的状态。最终我们需要由主程序来监控生产者-消费者的过程状态,调度结束程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#-*- coding:utf-8
def consumer():
status = True
while True:
n = yield status
print("我拿到了{}!".format(n))
if n == 3:
status = False

def producer(consumer):
n = 5
while n > 0:
# yield给主程序返回消费者的状态
# consumer.send(n)把n传值给c生成器,同时返回c生成器yield的结果(相当于fetch取一个放一个东西)
yield consumer.send(n)
n -= 1

if __name__ == '__main__':
c = consumer() # c产生一个生成器(带yield语句)
c.send(None) # consumer()程序推进到yield,但yield还未被执行.send()是传值给生成器的语句
p = producer(c) # p也产生一个生成器,但传入c生成器,与p进行通信
for status in p:# 循环获取p生成器yield回来的状态
if status == False:
print("我只要3,4,5就行啦")
break
print("程序结束")

上面这个例子是典型的生产者-消费者问题,我们用协程的方式来实现它。

第一句c = consumer(),因为consumer函数中存在yield语句,python会把它当成一个generator,因此在运行这条语句后,python并不会像执行函数一样,而是返回了一个generator object。

第二条语句c.send(None),这条语句的作用是将consumer(即变量c,它是一个generator)中的语句推进到第一个yield语句出现的位置,那么在例子中,consumer中的status = Truewhile True:都已经被执行了,程序停留在n = yield status的位置(注意:此时这条语句还没有被执行),上面说的send(None)语句十分重要,如果漏写这一句,那么程序直接报错

第三句p = producer(c),这里则像上面一样定义了producer的生成器,注意的是这里我们传入了消费者的生成器,来让producer跟consumer通信。

第四句for status in p:,这条语句会循环地运行producer和获取它yield回来的状态。

现在程序流进入了producer里面,我们直接看yield consumer.send(n),生产者调用了消费者的send()方法,把n发送给consumer(即c),在consumer中的n = yield status,n拿到的是消费者发送的数字,同时,consumer用yield的方式把状态(status)返回给消费者,注意:这时producer(即消费者)的consumer.send()调用返回的就是consumer中yield的status!消费者马上将status返回给调度它的主程序,主程序获取状态,判断是否错误,若错误,则终止循环,结束程序。上面看起来有点绕,其实这里面generator.send(n)的作用是:把n发送generator(生成器)中yield的赋值语句中,同时返回generator中yield的变量(结果)。

于是程序便一直运作,直至consumer中获取的n的值变为3!此时consumer把status变为False,最后返回到主程序,主程序中断循环,程序结束。

Coroutine与Generator

有些人会把生成器(generator)和协程(coroutine)的概念混淆,我以前也会这样,不过其实发现,两者的区别还是很大的。

直接上最重要的区别:

  • generator总是生成值,一般是迭代的序列
  • coroutine关注的是消耗值,是数据(data)的消费者
  • coroutine不会与迭代操作关联,而generator会
  • coroutine强调协同控制程序流,generator强调保存状态和产生数据

相似的是,它们都是不用return来实现重复调用的函数/对象,都用到了yield(中断/恢复)的方式来实现

asyncio

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。

asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。用asyncio实现Hello world代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

#@asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello()) # 循环执行EventLoop里要完成的事件
loop.close()

hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行

我们用Task封装两个coroutine试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
import asyncio

@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

观察执行过程:

1
2
3
4
5
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>) # asyncio.sleep(1)会挂起并去执行下一个hello()协程
(暂停约1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

总结:

  • asyncio提供了完善的异步IO支持;
  • 异步操作需要在coroutine中通过yield from`完成;
  • 多个coroutine可以封装成一组Task然后并发执行

async/await

asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

请注意,asyncawait是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  1. @asyncio.coroutine替换为async
  2. yield from替换为await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y

async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
# tasks = [print_sum(1, 2), print_sum(3, 4)]
# loop.run_until_complete(asyncio.wait(tasks))
loop.close()

当事件循环开始运行时,它会在Task中寻找coroutine来执行调度,因为事件循环注册了print_sum(),因此print_sum()被调用,执行result = await compute(x, y)这条语句(等同于result = yield from compute(x, y))因为compute()自身就是一个coroutine,因此print_sum()这个协程就会暂时被挂起,compute()被加入到事件循环中,程序流执行compute()中的print语句,打印”Compute %s + %s …”,然后执行了await asyncio.sleep(1.0),因为asyncio.sleep()也是一个coroutine,接着compute()就会被挂起,等待计时器读秒,在这1秒的过程中,事件循环会在队列中查询可以被调度的coroutine,而因为此前print_sum()compute()都被挂起了,因此事件循环会停下来等待协程的调度(如果有其他协程task就会在等待时间内去执行并返回),当计时器读秒结束后,程序流便会返回到compute()中执行return语句,结果会返回到print_sum()中的result中,最后打印result,事件队列中没有可以调度的任务了,此时loop.close()把事件队列关闭,程序结束。

-------------Thanks for Reading!-------------