一、SQLAlchemy核心架构
SQLAlchemy分为Core和ORM两层。Core是底层SQL抽象,ORM在其上构建对象-关系映射。理解这两层边界,是掌握性能调优的前提。
1.1 Core vs ORM的执行链路
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session, declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
engine = create_engine("postgresql://user:pass@localhost/db")
# Core层:直接执行SQL,速度最快
with engine.connect() as conn:
# 参数化查询,防注入
result = conn.execute(
text("SELECT * FROM users WHERE id = :id"),
{"id": 1}
)
row = result.fetchone()
# ORM层:对象化操作,开发效率高
with Session(engine) as session:
# 执行两次SQL:查询 + count
user = session.get(User, 1) # SELECT * FROM users WHERE id=1
# 执行一次SQL,带join
user = session.query(User).filter_by(id=1).first()
# ORM的N+1问题:
users = session.query(User).limit(100).all()
for user in users:
# ⚠️ 每个user.address访问都触发一次SQL
print(user.address.city)
# 解决:预加载
from sqlalchemy.orm import selectinload
users = (
session.query(User)
.options(selectinload(User.address)) # 预加载address
.limit(100).all()
)
# 执行:1次查询users + 1次查询addresses(IN子句)
# 总共2次SQL,替代101次
二、查询优化策略
2.1 分页的N种写法
# 方式一:OFFSET/LIMIT(简单但慢,N+1友好场景)
page = session.query(User).limit(20).offset(0).all()
# 缺点:OFFSET越大,数据库越慢(需扫描跳过行)
# 方式二:游标分页(性能稳定,推荐)
# 用id做游标,避免OFFSET扫描
last_id = 1000 # 上一页最后一条的id
cursor_page = (
session.query(User)
.filter(User.id > last_id) # 主键条件
.order_by(User.id)
.limit(20).all()
)
# 优点:O(1)复杂度,无论翻到哪一页都很快
# 缺点:只能往前翻,不能跳页
# 方式三:行号ROW_NUMBER(支持跳页)
from sqlalchemy import func, over
subq = (
session.query(
func.row_number().over(order_by=User.id).label("rn"),
User
).subquery()
)
paged = session.query(subq).filter(
subq.c.rn > 20
).filter(subq.c.rn <= 40).all()
2.2 批量写入优化
# ⚠️ ORM逐行插入:每条INSERT都单独事务
for user_data in users_data_list: # 10000条
session.add(User(**user_data)) # 逐行添加到session
session.commit() # 提交10000次INSERT
# 耗时:~30秒
# ✅ 批量插入:Core层bulk_insert
from sqlalchemy import insert
session.execute(
insert(User),
[
{"name": f"user{i}", "email": f"u{i}@test.com"}
for i in range(10000)
]
)
session.commit()
# 耗时:~0.5秒 ← 60倍提速!
# ✅ ORM的bulk_save_objects(中等效率)
session.bulk_save_objects(
[User(name=f"user{i}", email=f"u{i}@test.com") for i in range(10000)],
return_defaults=True # 返回自增id
)
session.commit()
# 耗时:~2秒
# ✅ bulk_insert_mappings(最优ORM方案)
session.bulk_insert_mappings(
User,
[{"name": f"user{i}", "email": f"u{i}@test.com"} for i in range(10000)]
)
session.commit()
三、连接池与并发
# 连接池配置(生产环境必须调优)
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=20, # 常驻连接数
max_overflow=30, # 超出pool_size的临时连接数
pool_pre_ping=True, # 每次取连接前ping检测可用性
pool_recycle=3600, # 1小时后回收连接(防MySQL连接超时)
echo=False, # True=打印所有SQL(开发用)
)
# with engine.connect() 的生命周期:
# conn = engine.connect() → 从池取连接
# conn.execute(...) → 使用连接
# conn.close() → 归还到池(不是真正关闭)
# session vs engine.connect() 的区别:
# engine.connect() → Core层,最小封装
# session → ORM层,包含事务管理、对象追踪
# 正确的Session使用模式(SQLAlchemy 2.0)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
engine = create_engine(url, pool_pre_ping=True)
Session = sessionmaker(bind=engine)
with Session() as session: # 自动commit/rollback
user = session.get(User, 1)
user.name = "new_name"
# 自动commit后,session.close()归还连接到池