- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我最近将 SQLAlchemy(带有 [asyncio] 包)更新到 1.4.46
并在提交时开始出现以下异常:
sqlalchemy.exc.IllegalStateChangeError: Method 'commit()' can't be called here; method '_connection_for_bind()' is already in progress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>
在更新到新版本之前,它运行良好。
# -*- coding:utf-8 -*-
from sqlalchemy import exc, event, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.pool import NullPool, Pool
from contextvars import ContextVar
from sanic import Sanic
import asyncio
class EngineNotInitialisedError(Exception):
pass
class DBSessionContext:
def __init__(self, session: Session, commit_on_exit: bool = True) -> None:
self.session = session
self._query = None
self.commit_on_exit = commit_on_exit
self.token = None
async def close(self, exc_type=None, exc_value=None, traceback=None):
if self._query:
if exc_value and getattr(exc_value, 'status_code', 500) > 300:
await self._query.rollback()
self._post_processing.clear()
else:
await self._query.commit()
await self.run_post_processing()
await self._query.close()
if self._post_processing:
await self.run_post_processing()
def set_token(self, token):
self.token = token
@property
def query(self) -> Session:
if not self._query:
self._query = self.session()
return self._query
class AsyncSession(SQLAlchemyAsyncSession):
async def execute(self, statement, **parameters):
try:
if isinstance(statement, str):
# We wrap around the `text()` method automatically
statement = text(statement)
return await super().execute(statement, parameters)
except exc.OperationalError as e:
if e.orig.args[0] == 1205:
# Lock wait timeout exceeded
await self.rollback()
return await super().execute(statement, parameters)
raise e
class DBSession:
def __init__(self):
self.engine = None
self.session = None
self._session = None
self.context = ContextVar("context", default=None)
def init_app(self, app: Sanic, url: str, commit_on_exit: bool = True) -> None:
self.commit_on_exit = commit_on_exit
engine_args = {
'echo': app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': NullPool, # will be used to create a connection pool instance using the connection parameters given in the URL
# if pool_class is not NullPool:
# the number of connections to allow in connection pool “overflow”
# 'max_overflow': app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
# if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
# 'pool_pre_ping': app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
# the number of connections to keep open inside the connection pool
# 'pool_size': app.config.get('DATABASE_POOL_SIZE', cast=int, default=5),
# this setting causes the pool to recycle connections after the given number of seconds has passed
# 'pool_recycle': app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=-1),
# number of seconds to wait before giving up on getting a connection from the pool
# 'pool_timeout': app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=3600),
}
self.engine = create_async_engine(
url,
**engine_args
)
self.session = sessionmaker(
bind=self.engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=False
)
async def __aenter__(self):
if not isinstance(self.engine, AsyncEngine):
raise EngineNotInitialisedError
session_ctx = DBSessionContext(self.session, self.commit_on_exit)
session_ctx.set_token(self.context.set(session_ctx))
return session_ctx
async def __aexit__(self, exc_type, exc_value, traceback):
session_ctx = self.context.get()
await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))
self.context.reset(session_ctx.token)
@property
def query(self) -> Session:
return self.context.get().query
@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
'''Listener for Pool checkout events that pings every connection before using.
Implements pessimistic disconnect handling strategy. See also:
http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''
cursor = dbapi_con.cursor()
try:
cursor.execute("SELECT 1")
except exc.OperationalError as ex:
if ex.args[0] in (2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
2055): # Lost connection to MySQL server at '%s', system error: %d
raise exc.DisconnectionError() # caught by pool, which will retry with a new connection
else:
raise
cursor.close()
db = DBSession()
代码调用如下:
async with db:
await db.query.execute('INSERT INTO ...')
是什么导致了我遇到的 InvalidStateChangeError?如何避免这个问题?
最佳答案
SQLAlchemy 的 Github 存储库上有一个讨论,给出了问题发生的原因: https://github.com/sqlalchemy/sqlalchemy/discussions/9312
建议是代码正在调用类似的内容
asyncio.gather(func(session), func2(session)
两个函数共享同一 session ,这会导致 sqlalchemy.exc.IllegalStateChangeError
删除 asyncio.gather
调用可以解决该问题。 (或者使用两个 session ,每个函数一个)。
关于SQLAlchemy:方法 '_connection_for_bind()' 已在进行中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75758327/
我最近将 SQLAlchemy(带有 [asyncio] 包)更新到 1.4.46 并在提交时开始出现以下异常: sqlalchemy.exc.IllegalStateChangeError: Met
我是一名优秀的程序员,十分优秀!