Python数据库编程与ORM一、数据库连接基础Python通过DB-API 2.0规范PEP 249统一了数据库接口。不同数据库使用不同的驱动但API一致。import sqlite3# SQLite内置无需安装conn sqlite3.connect(example.db)cursor conn.cursor()# 创建表cursor.execute(CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,email TEXT UNIQUE,age INTEGER,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP))# 插入数据使用参数化查询防止SQL注入cursor.execute(INSERT INTO users (name, email, age) VALUES (?, ?, ?),(Alice, aliceexample.com, 30))# 批量插入users [(Bob, bobexample.com, 25),(Charlie, charlieexample.com, 35),(Diana, dianaexample.com, 28),]cursor.executemany(INSERT INTO users (name, email, age) VALUES (?, ?, ?),users)conn.commit()# 查询cursor.execute(SELECT * FROM users WHERE age ?, (26,))rows cursor.fetchall()for row in rows:print(row)conn.close()二、上下文管理器与连接池2.1 安全的数据库操作from contextlib import contextmanagercontextmanagerdef get_db_connection(db_path):conn sqlite3.connect(db_path)conn.row_factory sqlite3.Row # 返回字典风格的行try:yield connconn.commit()except Exception:conn.rollback()raisefinally:conn.close()contextmanagerdef get_cursor(conn):cursor conn.cursor()try:yield cursorfinally:cursor.close()# 使用with get_db_connection(example.db) as conn:with get_cursor(conn) as cursor:cursor.execute(SELECT * FROM users)for row in cursor.fetchall():print(dict(row))2.2 简单连接池import queueimport threadingclass ConnectionPool:def __init__(self, db_path, max_connections5):self.db_path db_pathself.pool queue.Queue(maxsizemax_connections)self.lock threading.Lock()self._size 0self._max max_connectionsdef get_connection(self):try:return self.pool.get_nowait()except queue.Empty:with self.lock:if self._size self._max:self._size 1conn sqlite3.connect(self.db_path)conn.row_factory sqlite3.Rowreturn conn# 等待可用连接return self.pool.get(timeout10)def return_connection(self, conn):self.pool.put(conn)contextmanagerdef connection(self):conn self.get_connection()try:yield connconn.commit()except Exception:conn.rollback()raisefinally:self.return_connection(conn)# 使用pool ConnectionPool(example.db, max_connections10)with pool.connection() as conn:cursor conn.cursor()cursor.execute(SELECT COUNT(*) FROM users)print(cursor.fetchone()[0])三、MySQL与PostgreSQL3.1 MySQL使用pymysqlimport pymysqlconfig {host: localhost,port: 3306,user: root,password: password,database: mydb,charset: utf8mb4,cursorclass: pymysql.cursors.DictCursor,}conn pymysql.connect(**config)try:with conn.cursor() as cursor:cursor.execute(SELECT * FROM users WHERE status %s LIMIT %s,(active, 10))results cursor.fetchall()finally:conn.close()3.2 PostgreSQL使用psycopg2import psycopg2from psycopg2.extras import RealDictCursorconn psycopg2.connect(hostlocalhost,port5432,dbnamemydb,userpostgres,passwordpassword)with conn.cursor(cursor_factoryRealDictCursor) as cursor:cursor.execute(SELECT * FROM users WHERE age BETWEEN %s AND %s,(20, 30))users cursor.fetchall()conn.commit()conn.close()四、SQLAlchemy CoreSQLAlchemy提供两层APICoreSQL表达式和ORM对象关系映射。from sqlalchemy import create_engine, MetaData, Table, Columnfrom sqlalchemy import Integer, String, Float, DateTime, ForeignKeyfrom sqlalchemy import select, insert, update, delete, func, and_, or_# 创建引擎engine create_engine(sqlite:///example.db,echoFalse, # True会打印SQL语句pool_size5,max_overflow10,)# 定义表结构metadata MetaData()users Table(users, metadata,Column(id, Integer, primary_keyTrue),Column(name, String(50), nullableFalse),Column(email, String(100), uniqueTrue),Column(age, Integer),)orders Table(orders, metadata,Column(id, Integer, primary_keyTrue),Column(user_id, Integer, ForeignKey(users.id)),Column(amount, Float),Column(status, String(20)),)metadata.create_all(engine)# 使用Core APIwith engine.connect() as conn:# 插入conn.execute(insert(users).values(nameEve, emaileveexample.com, age22))# 查询stmt select(users).where(users.c.age 25).order_by(users.c.name)result conn.execute(stmt)for row in result:print(row.name, row.age)# 聚合stmt select(func.count(), func.avg(users.c.age)).select_from(users)count, avg_age conn.execute(stmt).fetchone()# 连接查询stmt (select(users.c.name, orders.c.amount).join(orders, users.c.id orders.c.user_id).where(orders.c.status completed))result conn.execute(stmt)conn.commit()五、SQLAlchemy ORMfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationshipfrom sqlalchemy.orm import Session, sessionmakerfrom datetime import datetimefrom typing import Optionalclass Base(DeclarativeBase):passclass User(Base):__tablename__ usersid: Mapped[int] mapped_column(primary_keyTrue)name: Mapped[str] mapped_column(String(50))email: Mapped[str] mapped_column(String(100), uniqueTrue)age: Mapped[Optional[int]] mapped_column(defaultNone)created_at: Mapped[datetime] mapped_column(default_factorydatetime.now)# 关系orders: Mapped[list[Order]] relationship(back_populatesuser)def __repr__(self):return fUser(id{self.id}, name{self.name})class Order(Base):__tablename__ ordersid: Mapped[int] mapped_column(primary_keyTrue)user_id: Mapped[int] mapped_column(ForeignKey(users.id))amount: Mapped[float]status: Mapped[str] mapped_column(String(20), defaultpending)user: Mapped[User] relationship(back_populatesorders)def __repr__(self):return fOrder(id{self.id}, amount{self.amount})# 创建表Base.metadata.create_all(engine)# Session操作SessionLocal sessionmaker(bindengine)with SessionLocal() as session:# 创建user User(nameFrank, emailfrankexample.com, age32)session.add(user)session.flush() # 获取自动生成的IDorder Order(user_iduser.id, amount99.99)session.add(order)# 查询users session.query(User).filter(User.age 25).all()# 新式查询推荐from sqlalchemy import selectstmt select(User).where(User.age 25).order_by(User.name)users session.scalars(stmt).all()# 关联查询stmt (select(User).join(User.orders).where(Order.amount 50).distinct())users_with_big_orders session.scalars(stmt).all()# 更新user session.get(User, 1)if user:user.name Updated Name# 批量更新session.execute(update(User).where(User.age 20).values(statusminor))# 删除session.execute(delete(Order).where(Order.status cancelled))session.commit()六、数据库迁移Alembic# 安装: pip install alembic# 初始化: alembic init migrations# alembic.ini 配置# sqlalchemy.url sqlite:///example.db# migrations/env.py 中设置 target_metadata# target_metadata Base.metadata# 创建迁移脚本# alembic revision --autogenerate -m add phone column# 生成的迁移文件示例def upgrade():op.add_column(users, sa.Column(phone, sa.String(20)))def downgrade():op.drop_column(users, phone)# 执行迁移# alembic upgrade head# alembic downgrade -1七、Repository模式from abc import ABC, abstractmethodfrom typing import TypeVar, Generic, OptionalT TypeVar(T)class Repository(ABC, Generic[T]):abstractmethoddef get_by_id(self, id: int) - Optional[T]:passabstractmethoddef get_all(self) - list[T]:passabstractmethoddef add(self, entity: T) - T:passabstractmethoddef update(self, entity: T) - T:passabstractmethoddef delete(self, id: int) - None:passclass UserRepository(Repository[User]):def __init__(self, session: Session):self.session sessiondef get_by_id(self, id: int) - Optional[User]:return self.session.get(User, id)def get_all(self) - list[User]:return self.session.scalars(select(User)).all()def get_by_email(self, email: str) - Optional[User]:stmt select(User).where(User.email email)return self.session.scalars(stmt).first()def add(self, user: User) - User:self.session.add(user)self.session.flush()return userdef update(self, user: User) - User:self.session.merge(user)return userdef delete(self, id: int) - None:user self.get_by_id(id)if user:self.session.delete(user)def search(self, name_query: str, min_age: int 0) - list[User]:stmt (select(User).where(and_(User.name.ilike(f%{name_query}%),User.age min_age)).order_by(User.name))return self.session.scalars(stmt).all()八、异步数据库操作from sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.ext.asyncio import async_sessionmaker# 异步引擎async_engine create_async_engine(sqliteaiosqlite:///example.db)AsyncSessionLocal async_sessionmaker(async_engine, class_AsyncSession)async def get_users():async with AsyncSessionLocal() as session:stmt select(User).where(User.age 25)result await session.scalars(stmt)return result.all()async def create_user(name: str, email: str):async with AsyncSessionLocal() as session:user User(namename, emailemail)session.add(user)await session.commit()await session.refresh(user)return user九、查询优化# 1. 预加载关联数据避免N1问题from sqlalchemy.orm import joinedload, selectinload# 懒加载默认- 访问user.orders时才查询导致N1users session.scalars(select(User)).all()# JOIN预加载 - 一次查询获取所有数据stmt select(User).options(joinedload(User.orders))users session.scalars(stmt).unique().all()# 子查询预加载 - 两次查询stmt select(User).options(selectinload(User.orders))users session.scalars(stmt).all()# 2. 只查询需要的列stmt select(User.name, User.email).where(User.age 25)# 3. 分页stmt select(User).offset(20).limit(10)# 4. 使用索引class User(Base):__tablename__ users__table_args__ (Index(idx_user_email, email),Index(idx_user_age_name, age, name),)十、总结数据库编程要点1. 始终使用参数化查询防止SQL注入2. 使用连接池管理数据库连接3. 合理使用事务保证数据一致性4. ORM适合业务逻辑复杂的场景Core适合性能敏感的批量操作5. 注意N1查询问题合理使用预加载6. 使用Alembic管理数据库迁移
Python数据库编程与ORM
发布时间:2026/5/16 3:38:10
Python数据库编程与ORM一、数据库连接基础Python通过DB-API 2.0规范PEP 249统一了数据库接口。不同数据库使用不同的驱动但API一致。import sqlite3# SQLite内置无需安装conn sqlite3.connect(example.db)cursor conn.cursor()# 创建表cursor.execute(CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,email TEXT UNIQUE,age INTEGER,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP))# 插入数据使用参数化查询防止SQL注入cursor.execute(INSERT INTO users (name, email, age) VALUES (?, ?, ?),(Alice, aliceexample.com, 30))# 批量插入users [(Bob, bobexample.com, 25),(Charlie, charlieexample.com, 35),(Diana, dianaexample.com, 28),]cursor.executemany(INSERT INTO users (name, email, age) VALUES (?, ?, ?),users)conn.commit()# 查询cursor.execute(SELECT * FROM users WHERE age ?, (26,))rows cursor.fetchall()for row in rows:print(row)conn.close()二、上下文管理器与连接池2.1 安全的数据库操作from contextlib import contextmanagercontextmanagerdef get_db_connection(db_path):conn sqlite3.connect(db_path)conn.row_factory sqlite3.Row # 返回字典风格的行try:yield connconn.commit()except Exception:conn.rollback()raisefinally:conn.close()contextmanagerdef get_cursor(conn):cursor conn.cursor()try:yield cursorfinally:cursor.close()# 使用with get_db_connection(example.db) as conn:with get_cursor(conn) as cursor:cursor.execute(SELECT * FROM users)for row in cursor.fetchall():print(dict(row))2.2 简单连接池import queueimport threadingclass ConnectionPool:def __init__(self, db_path, max_connections5):self.db_path db_pathself.pool queue.Queue(maxsizemax_connections)self.lock threading.Lock()self._size 0self._max max_connectionsdef get_connection(self):try:return self.pool.get_nowait()except queue.Empty:with self.lock:if self._size self._max:self._size 1conn sqlite3.connect(self.db_path)conn.row_factory sqlite3.Rowreturn conn# 等待可用连接return self.pool.get(timeout10)def return_connection(self, conn):self.pool.put(conn)contextmanagerdef connection(self):conn self.get_connection()try:yield connconn.commit()except Exception:conn.rollback()raisefinally:self.return_connection(conn)# 使用pool ConnectionPool(example.db, max_connections10)with pool.connection() as conn:cursor conn.cursor()cursor.execute(SELECT COUNT(*) FROM users)print(cursor.fetchone()[0])三、MySQL与PostgreSQL3.1 MySQL使用pymysqlimport pymysqlconfig {host: localhost,port: 3306,user: root,password: password,database: mydb,charset: utf8mb4,cursorclass: pymysql.cursors.DictCursor,}conn pymysql.connect(**config)try:with conn.cursor() as cursor:cursor.execute(SELECT * FROM users WHERE status %s LIMIT %s,(active, 10))results cursor.fetchall()finally:conn.close()3.2 PostgreSQL使用psycopg2import psycopg2from psycopg2.extras import RealDictCursorconn psycopg2.connect(hostlocalhost,port5432,dbnamemydb,userpostgres,passwordpassword)with conn.cursor(cursor_factoryRealDictCursor) as cursor:cursor.execute(SELECT * FROM users WHERE age BETWEEN %s AND %s,(20, 30))users cursor.fetchall()conn.commit()conn.close()四、SQLAlchemy CoreSQLAlchemy提供两层APICoreSQL表达式和ORM对象关系映射。from sqlalchemy import create_engine, MetaData, Table, Columnfrom sqlalchemy import Integer, String, Float, DateTime, ForeignKeyfrom sqlalchemy import select, insert, update, delete, func, and_, or_# 创建引擎engine create_engine(sqlite:///example.db,echoFalse, # True会打印SQL语句pool_size5,max_overflow10,)# 定义表结构metadata MetaData()users Table(users, metadata,Column(id, Integer, primary_keyTrue),Column(name, String(50), nullableFalse),Column(email, String(100), uniqueTrue),Column(age, Integer),)orders Table(orders, metadata,Column(id, Integer, primary_keyTrue),Column(user_id, Integer, ForeignKey(users.id)),Column(amount, Float),Column(status, String(20)),)metadata.create_all(engine)# 使用Core APIwith engine.connect() as conn:# 插入conn.execute(insert(users).values(nameEve, emaileveexample.com, age22))# 查询stmt select(users).where(users.c.age 25).order_by(users.c.name)result conn.execute(stmt)for row in result:print(row.name, row.age)# 聚合stmt select(func.count(), func.avg(users.c.age)).select_from(users)count, avg_age conn.execute(stmt).fetchone()# 连接查询stmt (select(users.c.name, orders.c.amount).join(orders, users.c.id orders.c.user_id).where(orders.c.status completed))result conn.execute(stmt)conn.commit()五、SQLAlchemy ORMfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationshipfrom sqlalchemy.orm import Session, sessionmakerfrom datetime import datetimefrom typing import Optionalclass Base(DeclarativeBase):passclass User(Base):__tablename__ usersid: Mapped[int] mapped_column(primary_keyTrue)name: Mapped[str] mapped_column(String(50))email: Mapped[str] mapped_column(String(100), uniqueTrue)age: Mapped[Optional[int]] mapped_column(defaultNone)created_at: Mapped[datetime] mapped_column(default_factorydatetime.now)# 关系orders: Mapped[list[Order]] relationship(back_populatesuser)def __repr__(self):return fUser(id{self.id}, name{self.name})class Order(Base):__tablename__ ordersid: Mapped[int] mapped_column(primary_keyTrue)user_id: Mapped[int] mapped_column(ForeignKey(users.id))amount: Mapped[float]status: Mapped[str] mapped_column(String(20), defaultpending)user: Mapped[User] relationship(back_populatesorders)def __repr__(self):return fOrder(id{self.id}, amount{self.amount})# 创建表Base.metadata.create_all(engine)# Session操作SessionLocal sessionmaker(bindengine)with SessionLocal() as session:# 创建user User(nameFrank, emailfrankexample.com, age32)session.add(user)session.flush() # 获取自动生成的IDorder Order(user_iduser.id, amount99.99)session.add(order)# 查询users session.query(User).filter(User.age 25).all()# 新式查询推荐from sqlalchemy import selectstmt select(User).where(User.age 25).order_by(User.name)users session.scalars(stmt).all()# 关联查询stmt (select(User).join(User.orders).where(Order.amount 50).distinct())users_with_big_orders session.scalars(stmt).all()# 更新user session.get(User, 1)if user:user.name Updated Name# 批量更新session.execute(update(User).where(User.age 20).values(statusminor))# 删除session.execute(delete(Order).where(Order.status cancelled))session.commit()六、数据库迁移Alembic# 安装: pip install alembic# 初始化: alembic init migrations# alembic.ini 配置# sqlalchemy.url sqlite:///example.db# migrations/env.py 中设置 target_metadata# target_metadata Base.metadata# 创建迁移脚本# alembic revision --autogenerate -m add phone column# 生成的迁移文件示例def upgrade():op.add_column(users, sa.Column(phone, sa.String(20)))def downgrade():op.drop_column(users, phone)# 执行迁移# alembic upgrade head# alembic downgrade -1七、Repository模式from abc import ABC, abstractmethodfrom typing import TypeVar, Generic, OptionalT TypeVar(T)class Repository(ABC, Generic[T]):abstractmethoddef get_by_id(self, id: int) - Optional[T]:passabstractmethoddef get_all(self) - list[T]:passabstractmethoddef add(self, entity: T) - T:passabstractmethoddef update(self, entity: T) - T:passabstractmethoddef delete(self, id: int) - None:passclass UserRepository(Repository[User]):def __init__(self, session: Session):self.session sessiondef get_by_id(self, id: int) - Optional[User]:return self.session.get(User, id)def get_all(self) - list[User]:return self.session.scalars(select(User)).all()def get_by_email(self, email: str) - Optional[User]:stmt select(User).where(User.email email)return self.session.scalars(stmt).first()def add(self, user: User) - User:self.session.add(user)self.session.flush()return userdef update(self, user: User) - User:self.session.merge(user)return userdef delete(self, id: int) - None:user self.get_by_id(id)if user:self.session.delete(user)def search(self, name_query: str, min_age: int 0) - list[User]:stmt (select(User).where(and_(User.name.ilike(f%{name_query}%),User.age min_age)).order_by(User.name))return self.session.scalars(stmt).all()八、异步数据库操作from sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.ext.asyncio import async_sessionmaker# 异步引擎async_engine create_async_engine(sqliteaiosqlite:///example.db)AsyncSessionLocal async_sessionmaker(async_engine, class_AsyncSession)async def get_users():async with AsyncSessionLocal() as session:stmt select(User).where(User.age 25)result await session.scalars(stmt)return result.all()async def create_user(name: str, email: str):async with AsyncSessionLocal() as session:user User(namename, emailemail)session.add(user)await session.commit()await session.refresh(user)return user九、查询优化# 1. 预加载关联数据避免N1问题from sqlalchemy.orm import joinedload, selectinload# 懒加载默认- 访问user.orders时才查询导致N1users session.scalars(select(User)).all()# JOIN预加载 - 一次查询获取所有数据stmt select(User).options(joinedload(User.orders))users session.scalars(stmt).unique().all()# 子查询预加载 - 两次查询stmt select(User).options(selectinload(User.orders))users session.scalars(stmt).all()# 2. 只查询需要的列stmt select(User.name, User.email).where(User.age 25)# 3. 分页stmt select(User).offset(20).limit(10)# 4. 使用索引class User(Base):__tablename__ users__table_args__ (Index(idx_user_email, email),Index(idx_user_age_name, age, name),)十、总结数据库编程要点1. 始终使用参数化查询防止SQL注入2. 使用连接池管理数据库连接3. 合理使用事务保证数据一致性4. ORM适合业务逻辑复杂的场景Core适合性能敏感的批量操作5. 注意N1查询问题合理使用预加载6. 使用Alembic管理数据库迁移