异步编程 #
协程 Coroutine #
协程是在单个线程中实现的异步函数,可以在需要的时候暂停执行,在需要的时候恢复执行。
yield & send #
早期的协程是通过生成器实现的。
每个生成器都可以执行 send()
方法,为生成器内部的 yield
语句发送数据。
此时 yield
语句不再只是 yield xxx
的形式,可以是 var = yield xxx
的赋值形式。
它同时具备两个功能,一是暂停并返回函数,二是接收外部 send()
方法发送过来的值,重新激活函数,并将这个值赋值给 var 变量。
def consumer():
status = True
while True:
# yield返回状态,并接收 send 参数 n
n = yield status
print("我拿到了{}!".format(n))
if n == 3:
status = False
def producer(consumer):
n = 5
while n > 0:
# yield给主程序返回消费者的状态
yield consumer.send(n)
n -= 1
# return a generator object
c = consumer()
# 重要:将生成器的语句推进到第一个yield语句出现的位置,此时yield语句还没有被执行
c.send(None)
# return a generator object
p = producer(c)
for status in p:
if status == False:
print("我只要3,4,5就行啦")
break
@asyncio.coroutine 与 yield from #
在3.5之前,asyncio 是使用它来创建协程的
@asyncio.coroutine
:asyncio模块中的装饰器,用于将一个生成器声明为协程yield from
语法可以把生成器的操作委托给另一个生成器
import asyncio
@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
tasks = [print_sum(1, 2), print_sum(3, 4)]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
async/await #
Python3.5中对协程提供了更直接的支持,引入了async/await关键字
使用 async
代替 @asyncio.coroutine
,使用 await
代替 yield from
async
: async函数和普通函数区别是,执行时可以暂停,交出执行权await
: 执行遇到await,会在异步任务开始执行之后,暂停当前 async 函数的执行, 把执行权交给事件循环,让其他 async 函数执行,等待下次被唤醒
Event Loop #
asyncio 在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。
python 3.7 的使用方式
- 先通过
asyncio.get_event_loop()
获取事件循环 loop 对象 - 然后通过不同的策略调用
loop.run_until_complete()
或者loop.run_forever()
执行异步函数
在 python 3.7 之后的版本,直接使用 asyncio.run()
即可,该函数总是会创建一个新的事件循环并在结束时进行关闭。
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
asyncio.run(main())
底层 loop #
asyncio.get_running_loop()
获取当前线程正在使用的loop对象asyncio.get_event_loop()
获取当前正在使用的loop对象。 如果当前线程还没有loop对象,那么就会创建一个新的loop对象,并使用asyncio.set_event_loop(loop)
方法设置到当前线程中asyncio.new_event_loop()
创建一个新的loop对象asyncio.set_event_loop(loop)
将loop设置成系统线程使用的对象loop.run_until_complete(future)
future对象执行完成才返回loop.run_forever()
一直运行,直到调用了loop.stop()方法loop.stop()
停止loop对象loop.is_running()
判断loop是否正在运行loop.is_closed()
判断loop是否关闭loop.close()
关闭loop对象loop.call_soon(callback, *args, context=None)
在事件循环的下一次迭代中执行callback方法,args是方法中的参数loop.call_soon_threadsafe(callback, *args, context=None)
线程安全的call_soon()loop.call_later(delay, callback, *args, context=None)
延迟delay秒后执行loop.call_at(when, callback, *args, context=None)
在指定时间点执行loop.time()
返回当前时间
Task #
await 将当前协程会挂起,让出 CPU 资源,但会阻塞当前协程。
下面代码在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world”
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
return what
async def main():
print(f"started at {time.strftime('%X')}")
print(await say_after(2, 'hello'))
print(await say_after(1, 'world'))
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
输出
started at 17:13:52
hello
world
finished at 17:13:55
asyncio.create_task()
将一个协程对象转化为一个任务对象,并将该任务对象加入到事件循环中进行调度。
不会阻塞当前协程,返回该任务对象,并立即返回,不会阻塞当前协程。
另外,由于任务对象是异步的,它可以在后台进行处理。
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
return what
async def main():
task1 = asyncio.create_task(say_after(2, 'hello'))
task2 = asyncio.create_task(say_after(1, 'world'))
print(f"started at {time.strftime('%X')}")
print(await task1)
print(await task2)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
预期的输出显示代码段的运行时间比之前快了 1 秒
started at 17:14:32
hello
world
finished at 17:14:34
task 对象是一个 Future 对象的子类,它表示一个异步操作的执行状态。 通过 task 对象,可以获取该异步操作的执行状态,包括是否完成、是否出现异常、返回值等信息。 此外,task 对象还提供了一些方法,如添加回调函数、取消操作等。
gather #
asyncio.gather()
接受多个协程作为参数,并返回一个协程。
调用该协程时,它会并发运行所有的协程,并在它们全部完成后返回一个包含所有返回值的列表。
gather 不是创建新的 task 对象,而是将多个协程对象封装成一个 Future 对象,然后将这个 Future 对象提交给事件循环进行调度。 和 task 对象一样,Future 对象也表示一个异步操作的执行状态,但是它不能添加回调函数、取消操作等。
import asyncio
async def coroutine_1():
await asyncio.sleep(1)
return 1
async def coroutine_2():
await asyncio.sleep(2)
return 2
async def main():
results = await asyncio.gather(coroutine_1(), coroutine_2())
print(results)
asyncio.run(main())
Lock #
asyncio.Lock
用于 asyncio 任务的互斥锁,用来保证对共享资源的独占访问。
lock = asyncio.Lock()
async with lock:
# access shared state