Python异步IO(协程)asyncio使用介绍

  • 2019-01-07 11:03:49
  • 开发
  • 67
  • shevechco

通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的,对于计算型任务由于GIL的存在我们通常使用多进程来实现,而对与IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。

其实对于IO型任务我们还有一种选择就是协程,协程是运行在单线程当中的“并发”,协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。Python中的asyncio也是基于协程来进行实现的。

我们先来看如何创建协程和task任务

01.
import asyncio
02.
03.
async def test_work():
04.
    return "Test work proccess !"
05.
06.
coroutine = test_work()
07.
loop = asyncio.get_event_loop()
08.
task = loop.create_task(coroutine)
09.
print(task)
10.
loop.run_until_complete(task)
11.
print(task.result())

menu.saveimg.savepath20190107110808.jpg

回调callback

我们从上述例子中可以看到,当协程任务完成返回finished状态的时候我们可以直接从任务结果即task的result方法获取结果

我们再来看看绑定回调的例子

01.
import asyncio
02.
03.
async def test_work():
04.
    return "Test work proccess !"
05.
06.
def callback(future):
07.
    print("Callback: ", future.result())
08.
09.
coroutine = test_work()
10.
loop = asyncio.get_event_loop()
11.
task = asyncio.ensure_future(coroutine)
12.
task.add_done_callback(callback)
13.
loop.run_until_complete(task)

menu.saveimg.savepath20190107111453.jpg

阻塞和await

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行。

耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。

01.
import asyncio
02.
03.
async def test1():
04.
    print("Test work proccess !")
05.
    await asyncio.sleep(3)
06.
    print("Test work proccess2 !")
07.
08.
async def test2():
09.
    print("This second task !")
10.
11.
coroutine1 = test1()
12.
coroutine2 = test2()
13.
loop = asyncio.get_event_loop()
14.
task1 = asyncio.ensure_future(coroutine1)
15.
task2 = asyncio.ensure_future(coroutine2)
16.
loop.run_until_complete(task1)
17.
loop.run_until_complete(task2)

menu.saveimg.savepath20190107113744.jpg

在 sleep的时候,使用await让出控制权。即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。现在我们的例子就用耗时的阻塞操作了。

并发和并行

asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。创建多个协程的列表,然后将这些协程注册到事件循环中

01.
import asyncio
02.
03.
async def do_some_work(x):
04.
    print('start:', x)
05.
    await asyncio.sleep(x)
06.
    print('end:',x)
07.
08.
coroutine1 = do_some_work(1)
09.
coroutine2 = do_some_work(2)
10.
coroutine3 = do_some_work(4)
11.
12.
tasks = [
13.
    asyncio.ensure_future(coroutine1),
14.
    asyncio.ensure_future(coroutine2),
15.
    asyncio.ensure_future(coroutine3)
16.
]
17.
18.
loop = asyncio.get_event_loop()
19.
print(asyncio.wait(tasks))
20.
loop.run_until_complete(asyncio.wait(tasks))

menu.saveimg.savepath20190107114000.jpg

此时我们使用了aysncio实现了并发。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。

协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

01.
import asyncio
02.
import time
03.
now = lambda: time.time()
04.
async def do_some_work(x):
05.
    print('Waiting: ', x)
06.
    await asyncio.sleep(x)
07.
    return 'Done after {}s'.format(x)
08.
async def main():
09.
    coroutine1 = do_some_work(1)
10.
    coroutine2 = do_some_work(2)
11.
    coroutine3 = do_some_work(4)
12.
    tasks = [
13.
        asyncio.ensure_future(coroutine1),
14.
        asyncio.ensure_future(coroutine2),
15.
        asyncio.ensure_future(coroutine3)
16.
    ]
17.
    dones, pendings = await asyncio.wait(tasks)
18.
    for task in dones:
19.
        print('Task ret: ', task.result())
20.
start = now()
21.
loop = asyncio.get_event_loop()
22.
loop.run_until_complete(main())
23.
print('TIME: ', now() - start)

menu.saveimg.savepath20190107114229.jpg

如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。

协程停止

上面见识了协程的几种常用的用法,都是协程围绕着事件循环进行的操作。future对象有几个状态:

01.
Pending
02.
Running
03.
Done
04.
Cancelled

创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task

01.
import asyncio
02.
import time
03.
 
04.
now = lambda: time.time()
05.
 
06.
async def do_some_work(x):
07.
    print('Waiting: ', x)
08.
    await asyncio.sleep(x)
09.
    return 'Done after {}s'.format(x)
10.
 
11.
coroutine1 = do_some_work(1)
12.
coroutine2 = do_some_work(2)
13.
coroutine3 = do_some_work(2)
14.
 
15.
tasks = [
16.
    asyncio.ensure_future(coroutine1),
17.
    asyncio.ensure_future(coroutine2),
18.
    asyncio.ensure_future(coroutine3)
19.
]
20.
 
21.
start = now()
22.
loop = asyncio.get_event_loop()
23.
try:
24.
    loop.run_until_complete(asyncio.wait(tasks))
25.
except KeyboardInterrupt as e:
26.
    print(asyncio.Task.all_tasks())
27.
    for task in asyncio.Task.all_tasks():
28.
        print(task.cancel())
29.
    loop.stop()
30.
    loop.run_forever()
31.
finally:
32.
    loop.close()
33.
print('TIME: ', now() - start)

启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。可以看到输出如下

01.
Waiting:  1
02.
Waiting:  2
03.
Waiting:  2
04.
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
05.
True
06.
True
07.
True
08.
True
09.
TIME:  0.8858370780944824

True表示cannel成功,loop stop之后还需要再次开启事件循环,最后在close,不然还会抛出异常:

01.
Task was destroyed but it is pending!
02.
task: <Task pending coro=<do_some_work() done,

循环task,逐个cancel是一种方案,可是正如上面我们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main相当于最外出的一个task,那么处理包装的main函数即可。

01.
import asyncio
02.
import time
03.
 
04.
now = lambda: time.time()
05.
 
06.
async def do_some_work(x):
07.
    print('Waiting: ', x)
08.
 
09.
    await asyncio.sleep(x)
10.
    return 'Done after {}s'.format(x)
11.
 
12.
async def main():
13.
    coroutine1 = do_some_work(1)
14.
    coroutine2 = do_some_work(2)
15.
    coroutine3 = do_some_work(2)
16.
 
17.
    tasks = [
18.
        asyncio.ensure_future(coroutine1),
19.
        asyncio.ensure_future(coroutine2),
20.
        asyncio.ensure_future(coroutine3)
21.
    ]
22.
    done, pending = await asyncio.wait(tasks)
23.
    for task in done:
24.
        print('Task ret: ', task.result())
25.
 
26.
start = now()
27.
loop = asyncio.get_event_loop()
28.
task = asyncio.ensure_future(main())
29.
try:
30.
    loop.run_until_complete(task)
31.
except KeyboardInterrupt as e:
32.
    print(asyncio.Task.all_tasks())
33.
    print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
34.
    loop.stop()
35.
    loop.run_forever()
36.
finally:
37.
    loop.close()


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.sulao.cn/post/626

相关推荐