11.7 协程

生成器有一个主要限制:它们只能为直接调用者生成值。

为克服此限制,Python 中添加了一个额外的语法,即 yield from 语句。使用此语法,你可以将生成器重构为工具函数,然后从它们中 yield from

例如,字母生成器可以重构为一个工具函数,起始字母是该函数的一个参数。使用 yield from,你可以选择返回哪个生成器对象:

cpython-book-samples/33/letter_coroutines.py

def gen_letters(start, x):
    i = start
    end = start + x
    while i < end:
        yield chr(i)
        i += 1

def letters(upper):
    if upper:
        yield from gen_letters(65, 26) # A--Z
    else:
        yield from gen_letters(97, 26) # a--z

for letter in letters(False):
    # Lowercase a--z
    print(letter)

for letter in letters(True):
    # Uppercase A--Z
    print(letter)

生成器也非常适用于惰性序列,它们可以在其中被多次调用。

基于生成器能够暂停和恢复执行等行为,协程 的概念在 Python 中经过了多轮 API 迭代。

生成器是协程的一种有限形式,因为你可以使用 .send() 方法向它们发送数据。可以在调用者和目标之间双向发送消息。协程还将调用者存储在 cr_origin 属性中。

协程最初是通过装饰器提供的,但此后已被弃用,取而代之的是使用关键字 asyncawait 的“原生”协程。

要标记函数返回协程,必须在函数前面加上 async 关键字。 与生成器不同,async 关键字明确表示此函数返回协程而不是值。

要创建协程,你可以使用关键字 async def 定义一个函数。在此示例中,你使用 asyncio.sleep() 函数添加一个计时器并返回字符串 wake up!

>>> import asyncio
>>> async def sleepy_alarm(time):
...    await asyncio.sleep(time)
...    return "wake up!"
>>> alarm = sleepy_alarm(10)
>>> alarm
<coroutine object sleepy_alarm at 0x1041de340>

当你调用该函数时,它会返回一个协程对象。

有很多方法可以执行协程。最简单的是使用 asyncio.run(coro)。使用协程对象运行 asyncio.run(),然后在 10 秒后它会发出警报:

>>> asyncio.run(alarm)
'wake up'

协程的好处是你可以并发运行它们。因为协程对象是一个可以传递给函数的变量,所以这些对象可以链接在一起,或者按顺序创建。

例如,如果你想有十个不同时间间隔的警报并同时启动它们,那么你可以将这些协程对象转换为任务。

任务 API 用于并发调度和执行多个协程。在安排任务之前,必须运行一个事件循环。事件循环的工作是调度并发任务,并将完成、取消和异常等事件与回调连接起来。

当调用 asyncio.run()(在 Lib/asyncio/runners.py 中)时,该函数会为你执行这些任务:

  1. 开始一个新的事件循环;

  2. 将协程对象包装在任务中;

  3. 在任务完成时设置回调;

  4. 循环任务直到完成;

  5. 返回结果。

相关源文件

这是协程相关的源文件:

文件
用途

Lib/asyncio

asyncio 的 Python 标准库实现

事件循环

事件循环是将异步代码粘合在一起的粘合剂。事件循环是用纯 Python 编写的,是包含任务的对象。

循环中的任何任务都可以有回调。如果任务完成或失败,循环将运行回调:

loop = asyncio.new_event_loop()

循环内部是一系列任务,由类型 asyncio.Task 表示。任务被安排到一个循环中,然后一旦循环运行,它就会遍历所有任务,直到它们完成。

你可以将单个计时器转换为任务循环:

cpython-book-samples/33/sleepy_alarm.py

import asyncio

async def sleepy_alarm(person, time):
    await asyncio.sleep(time)
    print(f"{person} -- wake up!")

async def wake_up_gang():
    tasks = [
        asyncio.create_task(sleepy_alarm("Bob", 3), name="wake up Bob"),
        asyncio.create_task(sleepy_alarm("Yudi", 4), name="wake up Yudi"),
        asyncio.create_task(sleepy_alarm("Doris", 2), name="wake up Doris"),
        asyncio.create_task(sleepy_alarm("Kim", 5), name="wake up Kim")
    ]
    await asyncio.gather(*tasks)

asyncio.run(wake_up_gang())

这将打印以下输出:

Doris -- wake up!
Bob -- wake up!
Yudi -- wake up!
Kim -- wake up!

事件循环将遍历每个协程以查看它们是否已完成。与 yield 关键字如何从同一帧返回多个值类似,await 关键字可以返回多个状态。

事件循环将一次又一次地执行 sleepy_alarm() 协程对象,直到 await asyncio.sleep() 生成一个已完成的结果并且 print() 能够执行。

为此,你需要使用 asyncio.sleep() 而不是阻塞(也不是异步感知)的time.sleep()

例子

你可以通过以下步骤将多线程端口扫描器示例转换为 asyncio

  • check_port() 更改为使用来自 asyncio.open_connection() 的套接字连接,这会创建 future 而不是立即连接;

  • 在带有 asyncio.wait_for() 的计时器事件中使用套接字连接 future;

  • 如果成功,将端口添加到结果列表;

  • 添加一个新函数 scan(),为每个端口创建 check_port() 协程并将它们添加到 tasks 列表中;

  • 使用 asyncio.gather() 将所有的 tasks 合并到一个新的协程中;

  • 使用 asyncio.run() 运行扫描器。

这是代码:

cpython-book-samples/33/portscanner_async.py

import time
import asyncio

timeout = 1.0

async def check_port(host: str, port: int, results: list):
    try:
        future = asyncio.open_connection(host=host, port=port)
        r, w = await asyncio.wait_for(future, timeout=timeout)
        results.append(port)
        w.close()
    except OSError: # pass on port closure
        pass
    except asyncio.TimeoutError:
        pass # Port is closed, skip and continue

async def scan(start, end, host):
    tasks = []
    results = []
    for port in range(start, end):
        tasks.append(check_port(host, port, results))
    await asyncio.gather(*tasks)
    return results

if __name__ == '__main__':
    start = time.time()
    host = "localhost" # Pick a host you own
    results = asyncio.run(scan(80, 100, host))
    for result in results:
        print("Port {0} is open".format(result))
    print("Completed scan in {0} seconds".format(time.time() - start))

此扫描器仅需一秒多一点即可完成:

$ python portscanner_async.py
Port 80 is open
Completed scan in 1.0058400630950928 seconds

Last updated