gpt4 book ai didi

python - 这是从 Twisted 进行线程化 SQLAlchemy 查询的可接受方式吗?

转载 作者:太空宇宙 更新时间:2023-11-03 11:05:17 25 4
gpt4 key购买 nike

我一直在阅读有关在 Twisted 应用程序上下文中使用 SQLAlchemy 的 ORM 的一些资料。有很多信息需要消化,所以我在将所有部分放在一起时遇到了一些麻烦。到目前为止,我收集了以下绝对真理:

  1. 一个 session 意味着一个线程。总是。
  2. 默认情况下,
  3. scoped_session 为我们提供了一种将 session 限制到给定线程的方法。换句话说,我确信通过使用 scoped_session,我不会将 session 传递给其他线程(除非我明确地这样做,但我不会这样做)。

我还收集到一些与延迟/急切加载相关的问题,一种可能的方法是将 ORM 对象与 session 分离,并在更改线程时将它们重新附加到另一个 session 。我对细节很模糊,但我也得出结论,scoped_session 使其中的许多点都没有实际意义。

我的第一个问题是我的上述结论是否严重错误

除此之外,我还设计了这种方法,希望它能令人满意。

我首先创建一个 scoped_session 对象...

Session = scoped_session(sessionmaker(bind=_my_engine))

...然后我将从上下文管理器中使用它,以便优雅地处理异常和清理:

@contextmanager
def transaction_context():
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.remove() # dispose of the session

现在我需要做的就是在延迟到单独线程的函数中使用上述上下文管理器。我拼凑了一个装饰器来让事情变得更漂亮:

def threaded(fn):
@wraps(fn) # functools.wraps
def wrapper(*args, **kwargs):
return deferToThread(fn, *args, **kwargs) # t.i.threads.deferToThread
return wrapper

这是我打算如何使用整个 shebang 的示例。下面是一个使用 SQLAlchemy ORM 执行数据库查找的函数:

@threaded
def get_some_attributes(group):
with transaction_context() as session:
return session.query(Attribute).filter(Attribute.group == group)

我的第二个问题是这种方法是否可行。

  • 我是否做出了根本性错误的假设?
  • 有什么注意事项吗?
  • 有没有更好的方法?

编辑: Here是关于上下文管理器中意外错误的相关问题。

最佳答案

现在我正在处理这个确切的问题,我想我找到了解决方案。

确实,您必须将所有数据库访问功能推迟到一个线程。但是在您的解决方案中,您在查询数据库后删除了 session ,因此所有结果 ORM 对象都将被分离,您将无法访问它们的字段。

你不能使用 scoped_session 因为在 Twisted 中我们只有一个 MainThread(除了在 deferToThread 中工作的东西)。但是,我们可以将 scoped_sessionscopefunc 一起使用。

在 Twisted 中有一个很棒的东西叫做 ContextTracker:

provides a way to pass arbitrary key/value data up and down a call stack without passing them as parameters to the functions on that call stack.

在方法 render_GET 中我扭曲的网络应用程序中,我设置了一个 uuid 参数:

call = context.call({"uuid": str(uuid.uuid4())}, self._render, request)

然后我调用 _render 方法来完成实际工作(使用数据库、渲染 html 等)。

我这样创建 scoped_session:

scopefunc = functools.partial(context.get, "uuid")
Session = scoped_session(session_factory, scopefunc=scopefunc)

现在,在 _render 的任何函数调用中,我都可以获得 session :

Session()

_render 结束时,我必须执行 Session.remove() 来删除 session 。

它适用于我的网络应用程序,我认为它可以用于其他任务。

这是一个完全独立的示例,展示了它们如何协同工作。

from twisted.internet import reactor, threads
from twisted.web.resource import Resource
from twisted.web.server import Site, NOT_DONE_YET
from twisted.python import context
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
import uuid
import functools

engine = create_engine(
'sqlite:///test.sql',
connect_args={'check_same_thread': False},
echo=False)

session_factory = sessionmaker(bind=engine)
scopefunc = functools.partial(context.get, "uuid")
Session = scoped_session(session_factory, scopefunc=scopefunc)
Base = declarative_base()


class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)

Base.metadata.create_all(bind=engine)


class TestPage(Resource):
isLeaf = True

def render_GET(self, request):
context.call({"uuid": str(uuid.uuid4())}, self._render, request)
return NOT_DONE_YET

def render_POST(self, request):
return self.render_GET(request)

def work_with_db(self):
user = User(name="TestUser")
Session.add(user)
Session.commit()
return user

def _render(self, request):
print "session: ", id(Session())
d = threads.deferToThread(self.work_with_db)

def success(result):
html = "added user with name - %s" % result.name
request.write(html.encode('UTF-8'))
request.finish()
Session.remove()
call = functools.partial(context.call, {"uuid": scopefunc()}, success)
d.addBoth(call)
return d

if __name__ == "__main__":
reactor.listenTCP(8888, Site(TestPage()))
reactor.run()

我打印出 session 的 ID,您可以看到它对于每个请求都是不同的。如果您从 scoped_session 构造函数中删除 scopefunc 并同时执行两个请求(将 time.sleep 插入 work_with_db),您将为此获得一个公共(public) session 两个请求。

The scoped_session object by default uses threading.local() as storage, so that a single Session is maintained for all who call upon the scoped_session registry, but only within the scope of a single thread

这里的一个问题是在 twisted 中我们只有一个线程来处理所有请求。这就是为什么我们必须创建自己的 scopefunc,这将显示请求之间的差异。

另一个问题是,twisted 没有将上下文传递给回调,我们必须包装回调并将当前上下文发送给它。

call = functools.partial(context.call, {"uuid": scopefunc()}, success)

我仍然不知道如何让它与 defer.inLineCallback 一起工作,我在我的代码中无处不在。

关于python - 这是从 Twisted 进行线程化 SQLAlchemy 查询的可接受方式吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21076105/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com