gpt4 book ai didi

python - 以可测试的方式连接到 mongodb

转载 作者:可可西里 更新时间:2023-11-01 09:12:37 26 4
gpt4 key购买 nike

我计划使用 Flask 和 MongoDB(可能还有作为 ODM 的 Ming)在 python 中编写一个 webapp。问题是我想让我的模型和 Controller 很好地分开,原因之一是能够在单独的组件上运行简单的单元测试。

现在这是我的问题,在请求生命周期的某个时刻,我需要连接到 MongoDB。每个请求将有一个单独的连接。 Flask 提供了一个线程局部对象,它可以包含对请求而言全局 的任何变量,这似乎是放置 mongo 连接的好地方。但是,这会在数据层和 Flask 之间产生硬依赖,这将使单独测试或运行它们变得非常困难。

所以我的问题真的是是否有一个优雅的解决方案。我自己想出了几个选项,但它们远非优雅。

首先,我可以只给数据模块一个函数,告诉它从哪里获取连接对象。或者类似地向它传递一个可用于获取新连接的函数。

第二个选项是创建一个模块可以用来连接 MongoDB 的类,然后创建这个类的两个版本,一个使用 Flask 的全局对象,另一个直接连接到 MongoDB。

在我看来,这两者都不是很健壮或优雅,有什么办法可以做得更好吗?

最佳答案

一种方法是使用 Python 模块级单例模式。创建一个具有“conn”对象的模块,(仅使用普通的 PyMongo)

try:
conn = Connection(max_pool_size=20)
except ConnectionFailure:
app.logger.critical("Unable to connect to MongoDB")

然后为 PyMongo 集合创建一个包装器

class Model(object):
def __init__(self, table, db = app.config['DB_NAME']):
self._table = table
self._db = db

def find_one(self, spec_or_id=None, *args, **kwargs):
result = None
try:
result = conn[self._db][self._table].find_one(spec_or_id, *args, **kwargs)
except InvalidName:
app.logger.critical('invalid DB or Table name')
finally:
conn.end_request()

return result

此处 conn.end_request() 将导致连接返回到池中,并且在每个 find_one() 上它将从池中获取连接。别担心,它们是线程安全的。

现在你可以像这样使用模型了

result = Model(COLLECTION).find_one({user:'joe'})

关于python - 以可测试的方式连接到 mongodb,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8543730/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com