gpt4 book ai didi

python - 如何在异步 sqlalchemy 中正确处理多对多?

转载 作者:行者123 更新时间:2023-12-04 13:08:33 78 4
gpt4 key购买 nike

我试图在表之间实现多对多关系。
当我使用 backpopulates 特定用户的所有标签时,必须在标签字段中。
表创建成功。
添加用户和标签。
链接表也是。

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.util import await_only, greenlet_spawn

from sqlalchemy import Column, Table, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.dialects.postgresql import VARCHAR, INTEGER

Base = declarative_base()

user_tag = Table('user_tag', Base.metadata,
Column('user_id', INTEGER, ForeignKey('users.id')),
Column('tag_id', INTEGER, ForeignKey('tags.id'))
)


class User(Base):
__tablename__ = 'users'
id = Column(INTEGER, primary_key=True)
name = Column(VARCHAR(32), nullable=False, unique=True)
tags = relationship("Tag",
secondary=user_tag,
back_populates="users")


class Tag(Base):
__tablename__ = 'tags'
id = Column(INTEGER, primary_key=True)
tag = Column(VARCHAR(255), nullable=False, unique=True)
users = relationship("User",
secondary=user_tag,
back_populates="tags")


async def main():
engine = create_async_engine(
"postgresql+asyncpg://postgres:pgs12345@localhost/test",
echo=False,
)

async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)

users = [User(name="p1"), User(name="p2"), User(name="p3")]
tags = [Tag(tag="tag1"), Tag(tag="tag2"), Tag(tag="tag3")]

async with AsyncSession(engine) as session:
async with session.begin():
session.add_all(users)
session.add_all(tags)

for user in users:
await session.refresh(user)
for tag in tags:
await session.refresh(tag)

for user in users:
for i in range(3, user.id - 1, -1):
await session.execute(user_tag.insert().values(user_id=user.id, tag_id=i))
await session.commit()

for user in users:
await session.refresh(user)
for tag in tags:
await session.refresh(tag)

tags = await greenlet_spawn(users[0].tags)
print(tags)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
当我运行程序时,它崩溃了:
 File "C:\Sources\asyncSQLAl test\main.py", line 48, in <module>
loop.run_until_complete(main())
File "C:\Users\Stanislav\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line
642, in run_until_complete
return future.result()
File "C:\Sources\asyncSQLAl test\main.py", line 41, in main
tags = await greenlet_spawn(await users[0].tags)
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\attributes.py", line 480, in __get__
return self.impl.get(state, dict_)
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\attributes.py", line 931, in get
value = self.callable_(state, passive)
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\strategies.py", line 879, in _load_for_state
return self._emit_lazyload(
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\strategies.py", line 1036,
in _emit_lazyload
result = session.execute(
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\orm\session.py", line 1689, in
execute
result = conn._execute_20(statement, params or {}, execution_options)
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1582, in
_execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\sql\lambdas.py", line 481, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1451, in
_execute_clauseelement
ret = self._execute_context(
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1813, in
_execute_context
self._handle_dbapi_exception(
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1998, in
_handle_dbapi_exception
util.raise_(exc_info[1], with_traceback=exc_info[2])
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1770, in
_execute_context
self.dialect.do_execute(
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\engine\default.py", line 717, in do_execute
cursor.execute(statement, parameters)
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py", line 449, in execute
self._adapt_connection.await_(
File "C:\Sources\asyncSQLAl test\venv\lib\site-packages\sqlalchemy\util\_concurrency_py3k.py", line 60, in await_only
raise exc.MissingGreenlet(
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place? (Background on this error at: http://sqlalche.me/e/14/xd2s)
sys:1: RuntimeWarning: coroutine 'AsyncAdapt_asyncpg_cursor._prepare_and_execute' was never awaited
我不太明白 greenlet_spawn 在这里是如何工作的,以及在这个例子中应该在哪里使用它。
例如,相同的程序,但风格同步
from sqlalchemy import Column, Table, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import VARCHAR, INTEGER

Base = declarative_base()

user_tag = Table('user_tag', Base.metadata,
Column('user_id', INTEGER, ForeignKey('users.id')),
Column('tag_id', INTEGER, ForeignKey('tags.id'))
)


class User(Base):
__tablename__ = 'users'
id = Column(INTEGER, primary_key=True)
name = Column(VARCHAR(32), nullable=False, unique=True)
tags = relationship("Tag",
secondary=user_tag,
back_populates="users")


class Tag(Base):
__tablename__ = 'tags'
id = Column(INTEGER, primary_key=True)
tag = Column(VARCHAR(255), nullable=False, unique=True)
users = relationship("User",
secondary=user_tag,
back_populates="tags")

def __str__(self):
return self.tag


def main():
engine = create_engine(
"postgresql+psycopg2://postgres:pgs12345@localhost/test",
echo=False,
)

Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

Session = sessionmaker(bind=engine)
session = Session()

users = [User(name="p1"), User(name="p2"), User(name="p3")]
tags = [Tag(tag="tag1"), Tag(tag="tag2"), Tag(tag="tag3")]

with session.begin():
session.add_all(users)
session.add_all(tags)

for user in users:
for i in range(3, user.id - 1, -1):
session.execute(user_tag.insert().values(
user_id=user.id, tag_id=i))
session.commit()

for tag in users[0].tags:
print(tag, end=" ")

main()
给我:
tag1 tag2 tag3 

最佳答案

我今天也一直坚持这个问题,我已经将范围缩小到尝试延迟加载的事实,GreenLet 对此并不满意。我不确定这是否只是我缺乏技能,但我发现这篇文章详细介绍了一些常见错误:
https://matt.sh/sqlalchemy-the-async-ening ,其中提到这个问题会以这种方式发生。此外,文档详细介绍了需要避免延迟加载:https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html .
我目前的解决方案是在 Parent 对象的初始查询时有效地预取 Child 关系,然后从那里对其进行操作。这是否是一个真正的错误,从某种意义上说,当它已经同步工作时它应该以异步方式工作,或者仅仅是异步方法的限制,我不知道。
编辑 06/08/21,这是我预取关系的方式:

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from . import models

async def get_parent_prefetch_children(db: AsyncSession, parent_id: int) -> models.Parent:
result = await db.execute(
sa.select(models.Parent).where(models.Parent.id == parent_id).options(
selectinload(models.Parent.children)
)
)
return result.scalar()
在您的情况下,您可以拨打 users[0].tags ,这会导致延迟加载并失败。为了避免这种情况,您必须重新获取用户并急切加载他们的标签。

关于python - 如何在异步 sqlalchemy 中正确处理多对多?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68195361/

78 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com