gpt4 book ai didi

SQLAlchemy:方法 '_connection_for_bind()' 已在进行中

转载 作者:行者123 更新时间:2023-12-03 07:57:11 42 4
gpt4 key购买 nike

我最近将 SQLAlchemy(带有 [asyncio] 包)更新到 1.4.46 并在提交时开始出现以下异常:

sqlalchemy.exc.IllegalStateChangeError: Method 'commit()' can't be called here; method '_connection_for_bind()' is already in progress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>

在更新到新版本之前,它运行良好。

# -*- coding:utf-8 -*-

from sqlalchemy import exc, event, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.pool import NullPool, Pool
from contextvars import ContextVar
from sanic import Sanic
import asyncio


class EngineNotInitialisedError(Exception):
pass


class DBSessionContext:
def __init__(self, session: Session, commit_on_exit: bool = True) -> None:
self.session = session
self._query = None
self.commit_on_exit = commit_on_exit
self.token = None

async def close(self, exc_type=None, exc_value=None, traceback=None):
if self._query:
if exc_value and getattr(exc_value, 'status_code', 500) > 300:
await self._query.rollback()
self._post_processing.clear()
else:
await self._query.commit()
await self.run_post_processing()

await self._query.close()

if self._post_processing:
await self.run_post_processing()

def set_token(self, token):
self.token = token

@property
def query(self) -> Session:
if not self._query:
self._query = self.session()

return self._query


class AsyncSession(SQLAlchemyAsyncSession):
async def execute(self, statement, **parameters):
try:
if isinstance(statement, str):
# We wrap around the `text()` method automatically
statement = text(statement)
return await super().execute(statement, parameters)
except exc.OperationalError as e:
if e.orig.args[0] == 1205:
# Lock wait timeout exceeded
await self.rollback()
return await super().execute(statement, parameters)

raise e


class DBSession:
def __init__(self):
self.engine = None
self.session = None
self._session = None
self.context = ContextVar("context", default=None)

def init_app(self, app: Sanic, url: str, commit_on_exit: bool = True) -> None:
self.commit_on_exit = commit_on_exit

engine_args = {
'echo': app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': NullPool, # will be used to create a connection pool instance using the connection parameters given in the URL
# if pool_class is not NullPool:

# the number of connections to allow in connection pool “overflow”
# 'max_overflow': app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
# if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
# 'pool_pre_ping': app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
# the number of connections to keep open inside the connection pool
# 'pool_size': app.config.get('DATABASE_POOL_SIZE', cast=int, default=5),
# this setting causes the pool to recycle connections after the given number of seconds has passed
# 'pool_recycle': app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=-1),
# number of seconds to wait before giving up on getting a connection from the pool
# 'pool_timeout': app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=3600),
}

self.engine = create_async_engine(
url,
**engine_args
)

self.session = sessionmaker(
bind=self.engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=False
)

async def __aenter__(self):
if not isinstance(self.engine, AsyncEngine):
raise EngineNotInitialisedError

session_ctx = DBSessionContext(self.session, self.commit_on_exit)
session_ctx.set_token(self.context.set(session_ctx))

return session_ctx

async def __aexit__(self, exc_type, exc_value, traceback):
session_ctx = self.context.get()
await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))

self.context.reset(session_ctx.token)

@property
def query(self) -> Session:
return self.context.get().query


@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
'''Listener for Pool checkout events that pings every connection before using.
Implements pessimistic disconnect handling strategy. See also:
http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

cursor = dbapi_con.cursor()
try:
cursor.execute("SELECT 1")
except exc.OperationalError as ex:
if ex.args[0] in (2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
2055): # Lost connection to MySQL server at '%s', system error: %d
raise exc.DisconnectionError() # caught by pool, which will retry with a new connection
else:
raise

cursor.close()


db = DBSession()

代码调用如下:

async with db:
await db.query.execute('INSERT INTO ...')

是什么导致了我遇到的 InvalidStateChangeError?如何避免这个问题?

最佳答案

SQLAlchemy 的 Github 存储库上有一个讨论,给出了问题发生的原因: https://github.com/sqlalchemy/sqlalchemy/discussions/9312

建议是代码正在调用类似的内容

asyncio.gather(func(session), func2(session) 两个函数共享同一 session ,这会导致 sqlalchemy.exc.IllegalStateChangeError

删除 asyncio.gather 调用可以解决该问题。 (或者使用两个 session ,每个函数一个)。

关于SQLAlchemy:方法 '_connection_for_bind()' 已在进行中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75758327/

42 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com