一、事件循环的运行机制
asyncio的核心是一个单线程事件循环(Event Loop),它调度所有协程(Coroutine)在同一个线程上交替执行,实现协作式并发。理解事件循环的调度机制,是写出高效异步代码的基础。
1.1 事件循环的四个核心能力
import asyncio
# asyncio事件循环的四大职责:
# ① 协程调度:决定哪个协程获得CPU
async def coro_a():
print("A1")
await asyncio.sleep(0) # 主动让出CPU
print("A2")
async def coro_b():
print("B1")
await asyncio.sleep(0)
print("B2")
# gather: 并发运行多个协程
async def main():
await asyncio.gather(coro_a(), coro_b())
# 输出顺序(取决于调度器):
# A1 B1 A2 B2 或 A1 B1 B2 A2
# (因为await asyncio.sleep(0)让出CPU,调度器选择下一个)
# ② I/O事件监听:使用epoll(Linux)/kqueue(macOS)
# ③ 定时任务调度:Timer Callback
# ④ 跨线程通信:Queue / Event / Future
# 查看当前事件循环实现:
loop = asyncio.new_event_loop()
print(loop.get_debug()) # True时打印详细调度信息
二、async/await的执行原理
2.1 协程对象的内部状态
# async def 创建的是一个协程对象(Coroutine)
# 不是立即执行的函数!
async def fetch_data():
return {"id": 1, "name": "test"}
# coroutine对象的三种状态:
# CREATED → PENDING → FINISHED
# 用inspect模块查看状态
import inspect
coro = fetch_data()
print(inspect.getcoroutinestate(coro)) # CORO_CREATED
# 必须在事件循环中运行才能推进状态
asyncio.get_event_loop().run_until_complete(coro)
# 输出:CORO_FINISHED
# await的底层做了什么:
# ① 暂停当前协程(状态=PENDING)
# ② 将回调注册到I/O完成队列
# ③ 当事件就绪时,唤醒并继续执行
# 生产代码不要这样做:手动创建事件循环
# ❌ asyncio.run() # 每次创建新的事件循环
# ✅ 正确方式:在uvicorn/django等框架中使用同一个事件循环
# 框架会自动管理事件循环的生命周期
2.2 Task与Future
# Task = Future + 协程调度
# asyncio.create_task() 将协程包装成Task
async def worker(name: str, delay: float):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
# 方式一:创建Task(立即调度)
task1 = asyncio.create_task(worker("A", 0.5))
# 方式二:ensure_future(更底层)
task2 = asyncio.ensure_future(worker("B", 0.3))
# 方式三:gather收集结果
result = await asyncio.gather(
asyncio.create_task(worker("C", 0.4)),
asyncio.create_task(worker("D", 0.2)),
)
print(result) # ['C done', 'D done']
# Task的回调钩子
def on_done(t: asyncio.Task):
try:
result = t.result()
print(f"Task完成: {result}")
except Exception as e:
print(f"Task异常: {e}")
task1.add_done_callback(on_done)
# 等待多个Task(任一完成时返回)
done, pending = await asyncio.wait(
[task1, task2],
timeout=1.0, # 1秒超时
return_when=asyncio.FIRST_COMPLETED
)
# FIRST_COMPLETED: 任一完成
# ALL_COMPLETED: 全部完成
# FIRST_EXCEPTION: 任一异常
# ⚠️ 重要:create_task后立即await和先await再create的区别
async def bad_pattern():
result = await worker("A", 1.0) # 串行!A完成后才创建B
task = asyncio.create_task(worker("B", 1.0))
async def good_pattern():
task = asyncio.create_task(worker("A", 1.0)) # A和B并发执行
result = await worker("B", 1.0)
result_a = await task
return result_a
三、并发HTTP请求实战
3.1 aiohttp连接池
import aiohttp
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
async def fetch_one(url: str) -> dict:
async with session.get(url) as resp:
return {"url": url, "status": resp.status, "body": await resp.json()}
# 并发执行,限制并发数
tasks = [fetch_one(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# ⚠️ 无限并发的问题:
# urls = [10000个URL]
# await asyncio.gather(*[fetch_one(u) for u in urls])
# → 同时发起10000个TCP连接 → 服务器拒绝 / 本地端口耗尽
# ✅ Semaphore限流
async def fetch_with_limit(session, url, semaphore):
async with semaphore:
async with session.get(url) as resp:
return await resp.json()
async def bounded_fetch(urls: list[str], max_concurrent: int = 50):
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
async def bounded(url):
async with semaphore:
return await session.get(url)
tasks = [bounded(url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
# 处理响应
results = []
for resp in responses:
if isinstance(resp, Exception):
results.append({"error": str(resp)})
else:
results.append(await resp.json())
return results
3.2 连接池配置优化
connector = aiohttp.TCPConnector(
limit=100, # 全局并发连接数上限
limit_per_host=30, # 单host并发连接数
ttl_dns_cache=300, # DNS缓存时间(秒)
enable_cleanup_closed=True,
force_close=False, # True=禁用keepalive
)
timeout = aiohttp.ClientTimeout(total=10, connect=5, sock_read=5)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout,
) as session:
# 连接复用:同Session内连接自动复用
for i in range(100):
async with session.get("https://api.example.com/data") as resp:
# 实际只建立少量TCP连接,100次请求复用
data = await resp.json()
四、异步上下文与结构化并发
# asyncio.TaskGroup(Python 3.11+,替代原来的wait_for略)
async def parent():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_data("A"))
tg.create_task(fetch_data("B"))
# TaskGroup在退出上下文时自动等待所有Task完成
# 任一Task抛出异常,其他Task被取消
except* Exception as eg:
for e in eg.exceptions:
print(f"捕获异常: {e}")
# ⚠️ 警惕:异步中的同步阻塞
async def bad_async():
import time # 阻塞!会阻塞整个事件循环
time.sleep(5) # 整个程序卡住5秒
# ✅ 用asyncio.sleep替代
await asyncio.sleep(5) # 非阻塞,事件循环可调度其他协程
# ⚠️ 警惕:run_in_executor桥接同步代码
async def call_sync_heavy():
loop = asyncio.get_event_loop()
# 将同步阻塞代码放到线程池执行
result = await loop.run_in_executor(
ThreadPoolExecutor(max_workers=4),
heavy_sync_function,
arg
)
return result