- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
SQLAlchemy
允许开发者通过 Python 代码与数据库进行交互,而无需直接编写 SQL 语句,同时也支持直接使用原生 SQL 进行复杂查询。下面是
SQLAlchemy和我们常规数据库对象的对应关系说明。
Table
对象或 Declarative Base
中的类来表示。declarative_base()
。from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' # 数据库表名 id = Column(Integer, primary_key=True) name = Column(String) email = Column(String)
Column
对象来表示。Column
对象,并作为类的属性定义。id = Column(Integer, primary_key=True)
name = Column(String(50))
new_user = User(id=1, name='John Doe', email='john@example.com')
primary_key=True
参数定义主键。id = Column(Integer, primary_key=True)
ForeignKey
对象来表示。ForeignKey
指定关系,指向另一个表的主键列。from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('users.id')) user = relationship('User')
relationship
对象来表示。relationship
来定义。addresses = relationship("Address", back_populates="user")
Session
对象进行事务性操作(如查询、插入、更新、删除)。Session
对象类似于数据库连接对象,用于与数据库进行交互。from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) session = Session() session.add(new_user) session.commit()
通过以上对应关系,SQLAlchemy允许开发者以面向对象的方式与数据库交互,提供了一个Pythonic的接口来操作数据库.
SQLAlchemy 提供了同步和异步两种操作方式,分别适用于不同的应用场景。以下是如何封装 SQLAlchemy 的同步和异步操作的方法说明:
在同步操作中,SQLAlchemy 使用传统的阻塞方式进行数据库操作。首先,定义一个基础的 Session 和 Engine 对象:
from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base, sessionmaker from typing import Generator from core.config import settings # 常规同步处理 engine = create_engine(settings.DB_URI) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def get_db() -> Generator: """创建一个 SQLAlchemy 数据库会话-同步处理.""" try: db = SessionLocal() yield db finally: db.close()
前面说了,使用SQLAlchemy可以实现不同数据库的统一模型的处理,我们可以对应创建不同数据库的连接(engine),如下是常规几种关系型数据库的连接处理.
# mysql 数据库引擎 engine = create_engine( "mysql+pymysql://root:123456@127.0.0.1:3306/WinFramework", pool_recycle=3600, # echo=True, ) # Sqlite 数据库引擎 engine = create_engine("sqlite:///testdir//test.db") # PostgreSQL 数据库引擎 engine = create_engine( "postgresql+psycopg2://postgres:123456@localhost:5432/winframework", # echo=True, ) # SQLServer 数据库引擎 engine = create_engine( "mssql+pymssql://sa:123456@localhost/WinFramework?tds_version=7.0", # echo=True, )
我们可以根据数据库的CRUD操作方式,封装一些操作,如下所示.
class CRUDOperations: def __init__(self, model): self.model = model def create(self, db, obj_in): db_obj = self.model(**obj_in.dict()) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def get(self, db, id): return db.query(self.model).filter(self.model.id == id).first() def update(self, db, db_obj, obj_in): obj_data = obj_in.dict(exclude_unset=True) for field in obj_data: setattr(db_obj, field, obj_data[field]) db.commit() db.refresh(db_obj) return db_obj def remove(self, db, id): obj = db.query(self.model).get(id) db.delete(obj) db.commit() return obj
使用时,构建数据访问类进行操作,如下测试代码所示.
crud_user = CRUDOperations(User) # Create with get_db() as db: user = crud_user.create(db, user_data) # Read with get_db() as db: user = crud_user.get(db, user_id) # Update with get_db() as db: user = crud_user.update(db, user, user_data) # Delete with get_db() as db: crud_user.remove(db, user_id)
。
对于异步操作,SQLAlchemy 使用 AsyncSession 来管理异步事务.
首先,定义一个异步的 Session 和 Engine 对象:
from sqlalchemy import create_engine, URL from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from typing import AsyncGenerator def create_engine_and_session(url: str | URL): try: # 数据库引擎 engine = create_async_engine(url, pool_pre_ping=True) except Exception as e: print("❌ 数据库链接失败 {}", e) sys.exit() else: db_session = async_sessionmaker( bind=engine, autoflush=False, expire_on_commit=False ) return engine, db_session # 异步处理 async_engine, async_session = create_engine_and_session(settings.DB_URI_ASYNC) async def get_db() -> AsyncGenerator[AsyncSession, None]: """创建一个 SQLAlchemy 数据库会话-异步处理.""" async with async_session() as session: yield session
和同步的处理类似,不过是换了一个对象来实现,并且函数使用了async await的组合来实现异步操作.
为了实现我的SQLSugar开发框架类似的封装模式,我们参考SQLSugar开发框架中基类CRUD的定义方式来实现多种接口的封装处理.
参照上面的实现方式,我们来看看Python中使用泛型的处理封装类的代码.
ModelType = TypeVar("ModelType", bound=Base) PrimaryKeyType = TypeVar("PrimaryKeyType", int, str, float) # 限定主键的类型 PageDtoType = TypeVar("PageDtoType", bound=BaseModel) DtoType = TypeVar("DtoType", bound=BaseModel) class BaseCrud(Generic[ModelType, PrimaryKeyType, PageDtoType, DtoType]): """ 基础CRUD操作类 """ def __init__(self, model: Type[ModelType]): """ 数据库访问操作的基类对象(CRUD). **Parameters** * `model`: A SQLAlchemy model class """ self.model = model
这样,我们就可以通过泛型定义不同的类型,以及相关的处理类的信息.
该基类函数中,异步定义get_all的返回所有的数据接口如下所示.
async def get_all( self, sorting: Optional[str], db: AsyncSession ) -> List[ModelType] | None: """根据ID字符串列表获取对象列表 :param sorting: 格式:name asc 或 name asc,age desc """ query = select(self.model) if sorting: query = self.apply_sorting(query, sorting) result = await db.execute(query) items = result.scalars().all() return items
而对应获得单个对象的操作函数,如下所示.
async def get(self, id: PrimaryKeyType, db: AsyncSession) -> Optional[ModelType]: """根据主键获取一个对象""" query = select(self.model).filter(self.model.id == id) result = await db.execute(query) item = result.scalars().first() return item
而创建对象的操作函数,如下所示.
async def create(self, obj_in: DtoType, db: AsyncSession, **kwargs) -> bool: """创建对象,使用 kwargs 时可以扩展创建对象时的字段。 :param obj_in: 对象输入数据 :param kwargs: 扩展字段,如格式: is_deleted=0, is_active=1 """ try: if kwargs: instance = self.model(**obj_in.model_dump(), **kwargs) else: instance = self.model(**obj_in.model_dump()) # type: ignore db.add(instance) await db.commit() return True except SQLAlchemyError as e: print(e) await db.rollback() return False
这个异步函数 create 旨在通过 SQLAlchemy 在数据库中创建一个对象,同时允许通过 kwargs 参数动态扩展创建对象时的字段.
async def
: 表明这是一个异步函数,可以与 await
一起使用。self
: 这是一个类的方法,因此 self
引用类的实例。obj_in: DtoType
: obj_in
是一个数据传输对象(DTO),它包含了需要插入到数据库中的数据。DtoType
是一个泛型类型,用于表示 DTO 对象。db: AsyncSession
: db
是一个 SQLAlchemy 的异步会话(AsyncSession
),用于与数据库进行交互。**kwargs
: 接受任意数量的关键字参数,允许在对象创建时动态传入额外的字段。obj_in.model_dump()
: 假设 obj_in
是一个 Pydantic 模型或类似结构,它可以通过 model_dump()
方法转换为字典格式,用于创建 SQLAlchemy 模型实例。self.model(**obj_in.model_dump(), **kwargs)
: 使用 obj_in
中的字段以及通过 kwargs
传入的扩展字段来实例化 SQLAlchemy 模型对象。如果 kwargs
非空,它们会被解包并作为额外的字段传入模型构造函数。db.add(instance)
: 将新创建的对象添加到当前的数据库会话中。await db.commit()
: 提交事务,将新对象保存到数据库。SQLAlchemyError
: 捕获所有 SQLAlchemy 相关的错误。await db.rollback()
: 在发生异常时,回滚事务,以防止不完整或错误的数据被提交。通过上面的封装,我们可以测试调用的处理例子 。
from crud.customer import customer as customer_crud from models.customer import Customer from pydantic import BaseModel from schemas.customer import CustomerDto, CustomerPageDto async def test_list_customer(): async with get_db() as db: print("get_list") totalCount, items = await customer_crud.get_list( CustomerPageDto(skipCount=0, maxResultCount=10, name="test"), db, ) print(totalCount, items) for customer in customers: print(customer.name, customer.age) print("get_by_name") name = "test" customer = await customer_crud.get_by_name( name, db, ) if customer: print(customer.name, customer.age) else: print(f"{name} not found") print("soft delete") result = await customer_crud.delete_byid(customer.id, db, is_deleted=1) print("操作结果:", result) print("soft delete_byids") result = await customer_crud.delete_byids( ["11122", "2C5F8672-2AA7-4B14-85AD-DF56F5BF7F1F"], db, is_deleted=1 ) print(f"Soft delete successful: {result}") print("update_by_column") result = await customer_crud.update_by_column( "id", customer.id, {"age": 30}, db ) print("操作结果:", result) await db.close()
同步和异步处理的差异:
FastAPI
,可以提高高并发场景下的性能。通过封装数据库操作,可以让代码更具复用性和可维护性,支持不同类型的操作场景.
最后此篇关于Python开发中,SQLAlchemy的同步操作和异步操作封装,以及常规CRUD的处理。的文章就讲到这里了,如果你想了解更多关于Python开发中,SQLAlchemy的同步操作和异步操作封装,以及常规CRUD的处理。的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在努力做到这一点 在我的操作中从数据库获取对象列表(确定) 在 JSP 上打印(确定) 此列表作为 JSP 中的可编辑表出现。我想修改然后将其提交回同一操作以将其保存在我的数据库中(失败。当我使用
我有以下形式的 Linq to Entities 查询: var x = from a in SomeData where ... some conditions ... select
我有以下查询。 var query = Repository.Query() .Where(p => !p.IsDeleted && p.Article.ArticleSections.Cou
我正在编写一个应用程序包,其中包含一个主类,其中主方法与GUI类分开,GUI类包含一个带有jtabbedpane的jframe,它有两个选项卡,第一个选项卡包含一个jtable,称为jtable1,第
以下代码产生错误 The nested query is not supported. Operation1='Case' Operation2='Collect' 问题是我做错了什么?我该如何解决?
我已经为 HA redis 集群(2 个副本、1 个主节点、3 个哨兵)设置了本地 docker 环境。只有哨兵暴露端口(10021、10022、10023)。 我使用的是 stackexchange
我正在 Desk.com 中构建一个“集成 URL”,它使用 Shopify Liquid 模板过滤器语法。对于开始日期为 7 天前而结束日期为现在的查询,此 URL 需要包含“开始日期”和“结束日期
你一定想过。然而情况却不理想,python中只能使用类似于 i++/i--等操作。 python中的自增操作 下面代码几乎是所有程序员在python中进行自增(减)操作的常用
我需要在每个使用 github 操作的手动构建中显示分支。例如:https://gyazo.com/2131bf83b0df1e2157480e5be842d4fb 我应该显示分支而不是一个。 最佳答
我有一个关于 Perl qr 运算符的问题: #!/usr/bin/perl -w &mysplit("a:b:c", /:/); sub mysplit { my($str, $patt
我已经使用 ArgoUML 创建了一个 ERD(实体关系图),我希望在一个类中创建两个操作,它们都具有 void 返回类型。但是,我只能创建一个返回 void 类型的操作。 例如: 我能够将 book
Github 操作仍处于测试阶段并且很新,但我希望有人可以提供帮助。我认为可以在主分支和拉取请求上运行 github 操作,如下所示: on: pull_request push: b
我正在尝试创建一个 Twilio 工作流来调用电话并记录用户所说的内容。为此,我正在使用 Record,但我不确定要在 action 参数中放置什么。 尽管我知道 Twilio 会发送有关调用该 UR
我不确定这是否可行,但值得一试。我正在使用模板缓冲区来减少使用此算法的延迟渲染器中光体积的过度绘制(当相机位于体积之外时): 使用廉价的着色器,将深度测试设置为 LEQUAL 绘制背面,将它们标记在模
有没有聪明的方法来复制 和 重命名 文件通过 GitHub 操作? 我想将一些自述文件复制到 /docs文件夹(:= 同一个 repo,不是远程的!),它们将根据它们的 frontmatter 重命名
我有一个 .csv 文件,其中第一列包含用户名。它们采用 FirstName LastName 的形式。我想获取 FirstName 并将 LastName 的第一个字符添加到它上面,然后删除空格。然
Sitecore 根据 Sitecore 树中定义的项目名称生成 URL, http://samplewebsite/Pages/Sample Page 但我们的客户有兴趣降低所有 URL(页面/示例
我正在尝试进行一些计算,但是一旦我输入金额,它就会完成。我只是希望通过单击按钮而不是自动发生这种情况。 到目前为止我做了什么: Angular JS - programming-fr
我的公司创建了一种在环境之间移动文件的复杂方法,现在我们希望将某些构建的 JS 文件(已转换和缩小)从一个 github 存储库移动到另一个。使用 github 操作可以实现这一点吗? 最佳答案 最简
在我的代码中,我创建了一个 JSONArray 对象。并向 JSONArray 对象添加了两个 JSONObject。我使用的是 json-simple-1.1.jar。我的代码是 package j
我是一名优秀的程序员,十分优秀!