一、事件循环的运行机制

asyncio的核心是一个单线程事件循环(Event Loop),它调度所有协程(Coroutine)在同一个线程上交替执行,实现协作式并发。理解事件循环的调度机制,是写出高效异步代码的基础。

1.1 事件循环的四个核心能力

import asyncio

# asyncio事件循环的四大职责:

# ① 协程调度:决定哪个协程获得CPU
async def coro_a():
    print("A1")
    await asyncio.sleep(0)  # 主动让出CPU
    print("A2")

async def coro_b():
    print("B1")
    await asyncio.sleep(0)
    print("B2")

# gather: 并发运行多个协程
async def main():
    await asyncio.gather(coro_a(), coro_b())

# 输出顺序(取决于调度器):
# A1 B1 A2 B2  或  A1 B1 B2 A2
# (因为await asyncio.sleep(0)让出CPU,调度器选择下一个)

# ② I/O事件监听:使用epoll(Linux)/kqueue(macOS)
# ③ 定时任务调度:Timer Callback
# ④ 跨线程通信:Queue / Event / Future

# 查看当前事件循环实现:
loop = asyncio.new_event_loop()
print(loop.get_debug())  # True时打印详细调度信息

二、async/await的执行原理

2.1 协程对象的内部状态

# async def 创建的是一个协程对象(Coroutine)
# 不是立即执行的函数!

async def fetch_data():
    return {"id": 1, "name": "test"}

# coroutine对象的三种状态:
# CREATED → PENDING → FINISHED
# 用inspect模块查看状态
import inspect
 coro = fetch_data()
print(inspect.getcoroutinestate(coro))  # CORO_CREATED

# 必须在事件循环中运行才能推进状态
asyncio.get_event_loop().run_until_complete(coro)
# 输出:CORO_FINISHED

# await的底层做了什么:
# ① 暂停当前协程(状态=PENDING)
# ② 将回调注册到I/O完成队列
# ③ 当事件就绪时,唤醒并继续执行

# 生产代码不要这样做:手动创建事件循环
# ❌ asyncio.run()  # 每次创建新的事件循环

# ✅ 正确方式:在uvicorn/django等框架中使用同一个事件循环
# 框架会自动管理事件循环的生命周期

2.2 Task与Future

# Task = Future + 协程调度
# asyncio.create_task() 将协程包装成Task

async def worker(name: str, delay: float):
    await asyncio.sleep(delay)
    return f"{name} done"

async def main():
    # 方式一:创建Task(立即调度)
    task1 = asyncio.create_task(worker("A", 0.5))

    # 方式二:ensure_future(更底层)
    task2 = asyncio.ensure_future(worker("B", 0.3))

    # 方式三:gather收集结果
    result = await asyncio.gather(
        asyncio.create_task(worker("C", 0.4)),
        asyncio.create_task(worker("D", 0.2)),
    )
    print(result)  # ['C done', 'D done']

    # Task的回调钩子
    def on_done(t: asyncio.Task):
        try:
            result = t.result()
            print(f"Task完成: {result}")
        except Exception as e:
            print(f"Task异常: {e}")

    task1.add_done_callback(on_done)

    # 等待多个Task(任一完成时返回)
    done, pending = await asyncio.wait(
        [task1, task2],
        timeout=1.0,  # 1秒超时
        return_when=asyncio.FIRST_COMPLETED
    )
    # FIRST_COMPLETED: 任一完成
    # ALL_COMPLETED: 全部完成
    # FIRST_EXCEPTION: 任一异常

# ⚠️ 重要:create_task后立即await和先await再create的区别
async def bad_pattern():
    result = await worker("A", 1.0)  # 串行!A完成后才创建B
    task = asyncio.create_task(worker("B", 1.0))

async def good_pattern():
    task = asyncio.create_task(worker("A", 1.0))  # A和B并发执行
    result = await worker("B", 1.0)
    result_a = await task
    return result_a

三、并发HTTP请求实战

3.1 aiohttp连接池

import aiohttp

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        async def fetch_one(url: str) -> dict:
            async with session.get(url) as resp:
                return {"url": url, "status": resp.status, "body": await resp.json()}

        # 并发执行,限制并发数
        tasks = [fetch_one(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# ⚠️ 无限并发的问题:
# urls = [10000个URL]
# await asyncio.gather(*[fetch_one(u) for u in urls])
# → 同时发起10000个TCP连接 → 服务器拒绝 / 本地端口耗尽

# ✅ Semaphore限流
async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        async with session.get(url) as resp:
            return await resp.json()

async def bounded_fetch(urls: list[str], max_concurrent: int = 50):
    semaphore = asyncio.Semaphore(max_concurrent)

    async with aiohttp.ClientSession() as session:
        async def bounded(url):
            async with semaphore:
                return await session.get(url)

        tasks = [bounded(url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)

        # 处理响应
        results = []
        for resp in responses:
            if isinstance(resp, Exception):
                results.append({"error": str(resp)})
            else:
                results.append(await resp.json())

        return results

3.2 连接池配置优化

connector = aiohttp.TCPConnector(
    limit=100,              # 全局并发连接数上限
    limit_per_host=30,       # 单host并发连接数
    ttl_dns_cache=300,       # DNS缓存时间(秒)
    enable_cleanup_closed=True,
    force_close=False,       # True=禁用keepalive
)

timeout = aiohttp.ClientTimeout(total=10, connect=5, sock_read=5)

async with aiohttp.ClientSession(
    connector=connector,
    timeout=timeout,
) as session:
    # 连接复用:同Session内连接自动复用
    for i in range(100):
        async with session.get("https://api.example.com/data") as resp:
            # 实际只建立少量TCP连接,100次请求复用
            data = await resp.json()

四、异步上下文与结构化并发

# asyncio.TaskGroup(Python 3.11+,替代原来的wait_for略)
async def parent():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(fetch_data("A"))
            tg.create_task(fetch_data("B"))
            # TaskGroup在退出上下文时自动等待所有Task完成
            # 任一Task抛出异常,其他Task被取消
    except* Exception as eg:
        for e in eg.exceptions:
            print(f"捕获异常: {e}")

# ⚠️ 警惕:异步中的同步阻塞
async def bad_async():
    import time  # 阻塞!会阻塞整个事件循环
    time.sleep(5)  # 整个程序卡住5秒

    # ✅ 用asyncio.sleep替代
    await asyncio.sleep(5)  # 非阻塞,事件循环可调度其他协程

# ⚠️ 警惕:run_in_executor桥接同步代码
async def call_sync_heavy():
    loop = asyncio.get_event_loop()
    # 将同步阻塞代码放到线程池执行
    result = await loop.run_in_executor(
        ThreadPoolExecutor(max_workers=4),
        heavy_sync_function,
        arg
    )
    return result