一、SQLAlchemy核心架构

SQLAlchemy分为Core和ORM两层。Core是底层SQL抽象,ORM在其上构建对象-关系映射。理解这两层边界,是掌握性能调优的前提。

1.1 Core vs ORM的执行链路

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session, declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(100))

engine = create_engine("postgresql://user:pass@localhost/db")

# Core层:直接执行SQL,速度最快
with engine.connect() as conn:
    # 参数化查询,防注入
    result = conn.execute(
        text("SELECT * FROM users WHERE id = :id"),
        {"id": 1}
    )
    row = result.fetchone()

# ORM层:对象化操作,开发效率高
with Session(engine) as session:
    # 执行两次SQL:查询 + count
    user = session.get(User, 1)  # SELECT * FROM users WHERE id=1

    # 执行一次SQL,带join
    user = session.query(User).filter_by(id=1).first()

# ORM的N+1问题:
users = session.query(User).limit(100).all()
for user in users:
    # ⚠️ 每个user.address访问都触发一次SQL
    print(user.address.city)

# 解决:预加载
from sqlalchemy.orm import selectinload

users = (
    session.query(User)
    .options(selectinload(User.address))  # 预加载address
    .limit(100).all()
)
# 执行:1次查询users + 1次查询addresses(IN子句)
# 总共2次SQL,替代101次

二、查询优化策略

2.1 分页的N种写法

# 方式一:OFFSET/LIMIT(简单但慢,N+1友好场景)
page = session.query(User).limit(20).offset(0).all()
# 缺点:OFFSET越大,数据库越慢(需扫描跳过行)

# 方式二:游标分页(性能稳定,推荐)
# 用id做游标,避免OFFSET扫描
last_id = 1000  # 上一页最后一条的id
cursor_page = (
    session.query(User)
    .filter(User.id > last_id)  # 主键条件
    .order_by(User.id)
    .limit(20).all()
)
# 优点:O(1)复杂度,无论翻到哪一页都很快
# 缺点:只能往前翻,不能跳页

# 方式三:行号ROW_NUMBER(支持跳页)
from sqlalchemy import func, over

subq = (
    session.query(
        func.row_number().over(order_by=User.id).label("rn"),
        User
    ).subquery()
)
paged = session.query(subq).filter(
    subq.c.rn > 20
).filter(subq.c.rn <= 40).all()

2.2 批量写入优化

# ⚠️ ORM逐行插入:每条INSERT都单独事务
for user_data in users_data_list:  # 10000条
    session.add(User(**user_data))  # 逐行添加到session
session.commit()  # 提交10000次INSERT
# 耗时:~30秒

# ✅ 批量插入:Core层bulk_insert
from sqlalchemy import insert

session.execute(
    insert(User),
    [
        {"name": f"user{i}", "email": f"u{i}@test.com"}
        for i in range(10000)
    ]
)
session.commit()
# 耗时:~0.5秒  ← 60倍提速!

# ✅ ORM的bulk_save_objects(中等效率)
session.bulk_save_objects(
    [User(name=f"user{i}", email=f"u{i}@test.com") for i in range(10000)],
    return_defaults=True  # 返回自增id
)
session.commit()
# 耗时:~2秒

# ✅ bulk_insert_mappings(最优ORM方案)
session.bulk_insert_mappings(
    User,
    [{"name": f"user{i}", "email": f"u{i}@test.com"} for i in range(10000)]
)
session.commit()

三、连接池与并发

# 连接池配置(生产环境必须调优)
engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_size=20,          # 常驻连接数
    max_overflow=30,       # 超出pool_size的临时连接数
    pool_pre_ping=True,    # 每次取连接前ping检测可用性
    pool_recycle=3600,     # 1小时后回收连接(防MySQL连接超时)
    echo=False,            # True=打印所有SQL(开发用)
)

# with engine.connect() 的生命周期:
# conn = engine.connect()  → 从池取连接
# conn.execute(...)       → 使用连接
# conn.close()             → 归还到池(不是真正关闭)

# session vs engine.connect() 的区别:
# engine.connect() → Core层,最小封装
# session → ORM层,包含事务管理、对象追踪

# 正确的Session使用模式(SQLAlchemy 2.0)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine(url, pool_pre_ping=True)
Session = sessionmaker(bind=engine)

with Session() as session:  # 自动commit/rollback
    user = session.get(User, 1)
    user.name = "new_name"
# 自动commit后,session.close()归还连接到池