跳转至

快速开始

本指南帮助您在 5 分钟内快速上手 SQLAlchemy CRUD Plus。

安装

pip install sqlalchemy-crud-plus

基础配置

数据库配置

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

# 数据库连接
DATABASE_URL = "sqlite+aiosqlite:///./app.db"

engine = create_async_engine(DATABASE_URL)
async_session = async_sessionmaker(bind=engine, class_=AsyncSession)

class Base(DeclarativeBase):
    pass

async def get_session():
    async with async_session() as session:
        yield session

模型定义

from datetime import datetime
from sqlalchemy import String, DateTime, Boolean, ForeignKey, func
from sqlalchemy.orm import Mapped, mapped_column, relationship

class User(Base):
    __tablename__ = 'users'

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(100), unique=True)
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    posts: Mapped[list["Post"]] = relationship(back_populates="author")

class Post(Base):
    __tablename__ = 'posts'

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(String(1000))
    author_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    author: Mapped[User] = relationship(back_populates="posts")

Pydantic 模式

from pydantic import BaseModel
from typing import Optional

class UserCreate(BaseModel):
    name: str
    email: str
    is_active: bool = True

class UserUpdate(BaseModel):
    name: Optional[str] = None
    email: Optional[str] = None
    is_active: Optional[bool] = None

class PostCreate(BaseModel):
    title: str
    content: str
    author_id: int

基本使用

创建 CRUD 实例

from sqlalchemy_crud_plus import CRUDPlus

user_crud = CRUDPlus(User)
post_crud = CRUDPlus(Post)

创建记录

# 创建用户
user_data = UserCreate(name="张三", email="zhangsan@example.com")
user = await user_crud.create_model(session, user_data)

# 批量创建
users_data = [
    UserCreate(name="李四", email="lisi@example.com"),
    UserCreate(name="王五", email="wangwu@example.com")
]
users = await user_crud.create_models(session, users_data)

查询记录

# 根据主键查询
user = await user_crud.select_model(session, pk=1)

# 根据字段查询
user = await user_crud.select_model_by_column(session, email="zhangsan@example.com")

# 查询多个记录
users = await user_crud.select_models(session, is_active=True)

# 分页查询
users = await user_crud.select_models(session, limit=10, offset=0)

# 过滤查询
users = await user_crud.select_models(
    session,
    name__like="%张%",
    created_at__ge="2024-01-01"
)

更新记录

# 根据主键更新
update_data = UserUpdate(name="新名称")
await user_crud.update_model(session, pk=1, obj=update_data)

# 使用字典更新
await user_crud.update_model(session, pk=1, obj={"is_active": False})

# 条件更新
await user_crud.update_model_by_column(
    session,
    obj={"is_active": True},
    name="李四"
)

删除记录

# 根据主键删除
await user_crud.delete_model(session, pk=1)

# 条件删除
await user_crud.delete_model_by_column(session, is_active=False)

统计查询

# 统计记录数
total = await user_crud.count(session)
active_count = await user_crud.count(session, is_active=True)

# 检查记录是否存在
exists = await user_crud.exists(session, email="test@example.com")
if not exists:
    user = await user_crud.create_model(session, user_data)

关系查询

# 预加载关系数据
users = await user_crud.select_models(
    session,
    load_strategies=['posts']
)

# JOIN 查询
users = await user_crud.select_models(
    session,
    join_conditions=['posts']
)

# 指定加载策略
user = await user_crud.select_model(
    session,
    pk=1,
    load_strategies={
        'posts': 'selectinload'
    }
)

# 查询构建方法
stmt = await user_crud.select(
    User.is_active == True,
    load_strategies=['posts']
)

stmt = await user_crud.select_order(
    sort_columns='created_at',
    sort_orders='desc'
)

高性能批量操作

# 高性能批量创建(使用字典)
users_dict = [
    {"name": "用户1", "email": "user1@example.com"},
    {"name": "用户2", "email": "user2@example.com"},
    {"name": "用户3", "email": "user3@example.com"}
]
users = await user_crud.bulk_create_models(session, users_dict)

# 批量更新不同数据
users_update = [
    {"id": 1, "name": "新名称1", "email": "new1@example.com"},
    {"id": 2, "name": "新名称2", "email": "new2@example.com"}
]
await user_crud.bulk_update_models(session, users_update)

完整示例

import asyncio
from datetime import datetime
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import String, DateTime, Boolean, ForeignKey, func
from pydantic import BaseModel
from sqlalchemy_crud_plus import CRUDPlus

# 数据库配置
DATABASE_URL = "sqlite+aiosqlite:///./example.db"
engine = create_async_engine(DATABASE_URL)
async_session = async_sessionmaker(bind=engine, class_=AsyncSession)

class Base(DeclarativeBase):
    pass

# 模型定义
class User(Base):
    __tablename__ = 'users'

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(100), unique=True)
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    posts: Mapped[list["Post"]] = relationship(back_populates="author")

class Post(Base):
    __tablename__ = 'posts'

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(String(1000))
    author_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
    created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())

    author: Mapped[User] = relationship(back_populates="posts")

# Pydantic 模式
class UserCreate(BaseModel):
    name: str
    email: str

class PostCreate(BaseModel):
    title: str
    content: str
    author_id: int

async def main():
    # 创建表
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # 创建 CRUD 实例
    user_crud = CRUDPlus(User)
    post_crud = CRUDPlus(Post)

    async with async_session() as session:
        # 创建用户
        user_data = UserCreate(name="张三", email="zhangsan@example.com")
        user = await user_crud.create_model(session, user_data)
        await session.commit()
        print(f"创建用户: {user.name}")

        # 创建文章
        post_data = PostCreate(
            title="我的第一篇文章",
            content="这是文章内容",
            author_id=user.id
        )
        post = await post_crud.create_model(session, post_data)
        await session.commit()
        print(f"创建文章: {post.title}")

        # 查询用户及其文章
        user_with_posts = await user_crud.select_model(
            session,
            pk=user.id,
            load_strategies=['posts']
        )
        print(f"用户 {user_with_posts.name}{len(user_with_posts.posts)} 篇文章")

        # 统计和检查
        total_users = await user_crud.count(session)
        print(f"总用户数: {total_users}")

        email_exists = await user_crud.exists(session, email="zhangsan@example.com")
        print(f"邮箱存在: {email_exists}")

if __name__ == "__main__":
    asyncio.run(main())

下一步