官网:https://docs.python.org/zh-cn/3/library/asyncio-task.html#scheduling-from-other-threads
用async定义的函数,可以叫协程函数、异步函数,本文统一叫协程函数。
调用协程函数返回的对象叫协程对象。
关键字 await 调用协程函数,也可以叫等待、等待调用,这里统一叫等待。
协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。例如,以下代码段 (需要 Python 3.7+) 打印 "hello",等待 1 秒,然后打印 "world":
>>> import asyncio >>> async def main(): ... print('hello') ... await asyncio.sleep(1) ... print('world') >>> asyncio.run(main()) hello world
跑一个协程有三种方法:
1、asyncio.run() 函数用来运行一个协程函数,比如上文中 asyncio.run(main())
2、await关键字等待一个协程函数
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
输出:
started at 17:13:52 hello world finished at 17:13:55
3、用asyncio.create_task() 调用协程函数,生成一个asyncio的任务,然后用await等待这个任务。
async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
如代码中的第2、5行,是完整的调用协程函数。
如果一个对象可以在await
语句中使用,那么它就是可等待 对象。
可等待 对象有三种主要类型:协程,任务 和Future.
1、协程
await 等待协程
import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won't run at all*. nested() # Let's do it differently now and await it: print(await nested()) # will print "42". asyncio.run(main())
第10行的普通调用不会执行,程序也不会报错,会出现警告:
RuntimeWarning: coroutine 'nested' was never awaited nested() RuntimeWarning: Enable tracemalloc to get the object allocation traceback Process finished with exit code 0
2、任务
任务 被用来设置到系统日程里以便并发 执行的协程。本质上是一个协程,需要用await调用。
当一个协程通过asyncio.create_task()
等函数被打包为一个任务,该协程将自动排入日程等待立即运行:
import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main())
第9行,task是一个future对象,可以取消任务、添加回调等,下文会提及。
3、future对象
以下文字来自官网:
Future
是一种特殊的低层级 可等待对象,表示一个异步操作的最终结果。当一个 Future 对象被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。
通常情况下没有必要 在应用层级的代码中创建 Future 对象。
Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:
一个很好的返回对象的低层级函数的示例是
loop.run_in_executor()
async def main(): await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
来自官网:
asyncio.
run
(coro,*,debug=False)此函数运行传入的协程,负责管理 asyncio 事件循环并完结异步生成器。
当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
如果debug 为
True
,事件循环将以调试模式运行。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
虽然官网说同一线程中只能有一个asyncio 事件循环,但是测试一个py文件中可以有两个asyncio.run(), 并且都能顺利执行。
import asyncio async def foo(): return 42 async def foo2(): return 43 async def main(): task = asyncio.create_task(foo()) await task return 'main1' async def main2(): task2 = asyncio.create_task(foo2()) return await task2 print(asyncio.run(main())) print(asyncio.run(main2()))
结果:
main1 43
asyncio.run(fn()) 函数的返回结果就是调用函数 fn() 返回的结果
await task 表达式的值就是task返回的值
来自官网:
asyncio.
create_task
(coro)将coro协程 打包为一个
Task
排入日程准备执行。返回 Task 对象。该任务会在
get_running_loop()
返回的循环中执行,如果当前线程没有在运行的循环则会引发RuntimeError
。此函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的
asyncio.ensure_future()
函数。
以下例子来自官网,两种创建任务方式:
async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro())
asyncio.
sleep
(delay,result=None,*,loop=None)
参数:
阻塞delay 指定的秒数。
如果指定了result,则当协程完成时将其返回给调用者。
sleep()
总是会挂起当前任务,以允许其他任务运行。
以下协程示例运行 5 秒,每秒显示一次当前日期:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
asyncio.
gather
(*aws,loop=None,return_exceptions=False)
参数:
aws 表示序列中的可等待对象。
return_exceptions 表示是是将异常立即返回,还是聚合到结果列表里。
如果aws 中的某个可等待对象为协程,它将自动作为一个任务加入日程。
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与aws 中可等待对象的顺序一致。
如果return_exceptions 为
False
(默认),所引发的首个异常会立即传播给等待gather()
的任务。aws 序列中的其他可等待对象不会被取消 并将继续运行。如果return_exceptions 为
True
,异常会和成功的结果一样处理,并聚合至结果列表。如果
gather()
被取消,所有被提交 (尚未完成) 的可等待对象也会被取消。如果aws 序列中的任一 Task 或 Future 对象被取消,它将被当作引发了
CancelledError
一样处理 -- 在此情况下gather()
调用不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。如果gather 本身被取消,则无论return_exceptions 取值为何,消息都会被传播(propagated)。
asyncio.
shield
(aw,*,loop=None)
如果aw 是一个协程,它将自动作为任务加入日程。
res = await shield(something())
不同之处 在于如果包含它的协程被取消,在
something()
中运行的任务不会被取消。从something()
的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 "await" 表达式仍然会引发CancelledError
。如果通过其他方式取消
something()
(例如在其内部操作) 则shield()
也会取消。
如果希望完全忽略取消操作 (不推荐) 则shield()
函数需要配合一个 try/except 代码段,如下所示:
try: res = await shield(something()) except CancelledError: res = None
asyncio.
wait_for
(aw,timeout,*,loop=None)
等待aw可等待对象 完成,指定 timeout 秒数后超时。
如果aw 是一个协程,它将自动作为任务加入日程。
timeout 可以为
None
,也可以为 float 或 int 型数值表示的等待秒数。如果timeout 为None
,则等待直到完成。如果发生超时,任务将取消并引发
asyncio.TimeoutError
,aw 指定的对象也会被取消。如果等待期间,任务被取消,aw 指定的对象也会被取消。
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # timeout! # Process finished with exit code 0
asyncio.
wait
(aws,*,loop=None,timeout=None,return_when=ALL_COMPLETED)
并发运行aws 指定的可等待对象 并阻塞线程直到满足return_when 指定的条件。
如果aws 中的某个可等待对象为协程,它将自动作为任务加入日程。
aws必须为容器
返回两个 Task/Future 集合:
(done, pending)
。用法:
done, pending = await asyncio.wait([aws, ])
如指定timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。
请注意此函数不会引发
asyncio.TimeoutError
。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。
return_when 指定此函数应在何时返回。它必须为以下常数之一:
常数 | 描述 |
FIRST_COMPLETED | 函数将在任意可等待对象结束或取消时返回。 |
FIRST_EXCEPTION | 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于ALL_COMPLETED 。 |
ALL_COMPLETED | 函数将在所有可等待对象结束或取消时返回。 |
async def foo(): return 42 async def main(): task = asyncio.create_task(foo()) done, pending = await asyncio.wait([task, ]) if task in done: print('done') asyncio.run(main()) ''' 结果如下: done Process finished with exit code 0 '''
如果第6行 await asyncio.wait(task) 直接传入任务会报错:
TypeError: expect a list of futures, not Task
wait 和 wait_for的不同
wait()
在超时发生时不会取消可等待对象, wait_for() 会取消。wait() 第一个参数传入任务的序列,不能直接传入序列, wait_for() aws第一个参数直接传入任务。
as_completed
(aws,*,loop=None,timeout=None)
并发地运行aws 集合中的可等待对象。返回一个
Future
对象的迭代器。返回的每个 Future 对象代表来自剩余可等待对象集合的最早结果。如果在所有 Future 对象完成前发生超时则将引发
asyncio.TimeoutError
。
示例:
for f in as_completed(aws): earliest_result = await f # ...
wait 和 as_completed的不同
两个函数都是等待任务执行结束,都传入任务的列表,都可以设定等待时间。
区别在于:1.有序无序
wait返回结果是无序的,调用是对传入的序列进行了集合化处理,as_completed是有序的。
2.返回结果不同
wait返回结果有两个,是完成和未完成的任务task的集合,可以对task.result() 直接获取任务的结果。
as_completed返回结果有一个,是一个含有协程对象的生成器,对其中的每一个协程对象,需要用await <协程> 获取结果。
实例代码:
wait:
async def foo(arg): await asyncio.sleep(arg) return arg async def main(): task = asyncio.create_task(foo(2)) task1 = asyncio.create_task(foo(1)) task2 = asyncio.create_task(foo(4)) done, pending = await asyncio.wait([task, task1, task2]) print([f.result() for f in done]) # [1, 4, 2] asyncio.run(main())
as_completed:
async def foo(arg): await asyncio.sleep(arg) return arg async def main(): task = asyncio.create_task(foo(2)) task1 = asyncio.create_task(foo(1)) task2 = asyncio.create_task(foo(4)) gen = asyncio.as_completed([task, task1, task2]) # print(gen) # <generator object as_completed at 0x7f4d3b9dd930> print([await f for f in gen]) # [1, 2, 4] asyncio.run(main())
Task
(coro,*,loop=None)
可运行 Python协程。非线程安全。
创建:asyncio.create_task(),loop.create_task(), asyncio.ensure_future()
取消:cancel()
获取结果:result()
获取异常:exception()
添加回调:add_done_callback(callback,*,context=None)移除回调:remove_done_callback(callback)
判断是否取消:cancelled()
判断是否结束:
done
()当 Task 所封包的协程返回一个值、引发一个异常或 Task 本身被取消时,则会被认为已完成。
asyncio.
current_task
(loop=None)
返回当前运行的
Task
实例,如果没有正在运行的任务则返回None
。
asyncio.
all_tasks
(loop=None)
返回事件循环所运行的未完成的
Task
对象的集合。