gpt4 book ai didi

python - 用于谷歌应用引擎 python 的迷你可迭代模型映射器

转载 作者:行者123 更新时间:2023-11-28 18:54:29 25 4
gpt4 key购买 nike

我不认为我想要的存在。任何人都可以为我创建一个准系统迷你映射器类吗?详细的伪代码或实际的 python 都可以。 更新:帖子底部的简单工作版本。

更新 2 - 6 月 20 日:

  • 更安全的执行:iterall() 中的 continue/break/return 现在可以工作了。
  • 添加了 defer_db 标志,用于将数据库放置和删除发送到任务队列
  • 出于抽象目的,可以指定每个实体必须通过的过滤函数,否则在迭代时不会返回。
  • 将 .bdelete() 更改为 .bdel()

更新 3 - 6 月 21 日:

  • 修复了上次更新中引入的阻止保存的主要错误。

我希望这对我以外的人有用。我经常使用它,它是介于 MapReduce 和你只需要游标的中间,因为你不确定你必须处理多少结果。


这是关于什么的?

gae 的 mapreduce 库很棒,但我想要一些轻量级和一次性的东西。在 python gae 的教程中,您经常会看到数据库模型正在迭代、修改和保存。我不认为还有更多这样的例子,因为我们知道这是非常低效的,并且为每个循环而不是批处理调用一次数据存储。不过我喜欢这个界面,而且我经常发现自己需要一种简单快速的方法来运行我的数据库模型。

它会是什么样子?

用法

  1. 导入类。
  2. 告诉它你想映射什么模型
  3. 为其提供可选的查询过滤器
  4. 获取迭代器对象
  5. 循环离开,安全地知道您不会进行数千次不必要的数据库调用。

幕后花絮

这就是我需要你帮助的地方,因为我觉得我无法自拔。

生成器 (我从未使用过生成器,只是有点了解它们) object batch-grabs datastore items(有多少是安全的?是否有硬性限制或是否取决于项目大小?)并以可迭代的方式呈现它们。达到 MAX_AMOUNT batch_size 后,将项目批量保存到数据存储并无缝抓取下一批(光标)。

我正在考虑的一件事是使用 defer 将项目保存到数据库,目的是在我们循环许多项目时节省一些时间。可能的缺点可能是下一部分代码预计 map 已经完成。所以我认为根据用户偏好设置或忽略“defer_db”标志会很好。如果您只需要少量项目,则不会设置延迟标志。

结论

请用代码概念为这个小项目做出贡献。接受的答案将是一周后投票最多的答案。不可否认,要求 SO 为我提出解决方案让我觉得有点脏,但说实话,我觉得自己无法胜任这项任务。 希望你觉得它有用。

例子

相同的查询函数

country_mim = MIM(CountryModels.all()).filter("spoken_language =", "French")
country_mim.order("population")

嵌套迭代

some_mim = MIM(SomeModel.all())
for x in some_mim.iterall():
if x.foo == 'ham sandwich':
sandwich_mim = MIM(MySandwiches.all())
for sandwich in sandwich_mim.iterall():
if 'ham' in sandwich.ingredients:
print 'yay'

批量保存和删除

country_mim = MIM(CountryModels.all()).order("drinking_age")
for country in country_mim.iterall():
if country.drinking_age > 21: # these countries should be nuked from orbit
country_mim.bdel(country) # delete
if country.drinking_age == 18:
country.my_thoughts = "god bless you foreigners"
country_mim.bput(country) # save
if country.drinking_age < 10: # panic
country.my_thoughts = "what is this i don't even..."
country_mim.bput(country)
break # even though we panicked, the bput still resolves

部分代码:MiniIterMapper.py

我已经使用此代码几个 周了,一切似乎都很好。 Defer 没有被包含在内。 查询门面代码是从伟大的 PagedQuery 窃取的(经许可)模块。支持批量保存和批量删除。

import google.appengine.ext.db as db
from google.appengine.ext.deferred import defer

class MIM(object):
"""
All standard Query functions (filter, order, etc) supported*. Default batch
size is 100. defer_db=True will cause put and delete datastore operations to
be deferred. allow_func accepts any function you wish and only the entities
that cause the function to return a true value will be returned during
iterall(). Using break/continue/return while iterating doesn't cause things
to explode (like it did in the 1st version).

* - thanks to http://code.google.com/p/he3-appengine-lib/wiki/PagedQuery
"""

def __init__(self, query, batch_size=100, defer_db=False, allow_func=None):

self._query = query
self._batch_size = batch_size
self._defer_db = defer_db
self._allow_func = allow_func
self._to_save = []
self._to_delete = []

# find out if we are dealing with another facade object
if query.__dict__.has_key('_query'): query_to_check = query._query
else: query_to_check = query

if isinstance(query_to_check, db.Query): self._query_type = 'Query'
elif isinstance(query_to_check, db.GqlQuery): self._query_type = 'GqlQuery'
else: raise TypeError('Query type not supported: ' + type(query).__name__)

def iterall(self):
"Return iterable over all datastore items matching query. Items pulled from db in batches."

results = self._query.fetch(self._batch_size) # init query
savedCursor = self._query.cursor() # init cursor

try:
while results:

for item in results:
if self._allow_func:
if self._allow_func(item):
yield item
else:
yield item

if len(results) == self._batch_size:
results = self._query.with_cursor(savedCursor).fetch(self._batch_size)
savedCursor = self._query.cursor()

else: # avoid additional db call if we don't have max amount
results = [] # while loop will end, and go to else section.
else:
self._finish()
except GeneratorExit:
self._finish()

def bput(self, item):
"Batch save."
self._to_save.append(item)
if len(self._to_save) >= self._batch_size:
self._bput_go()

def bdel(self, item):
"Batch delete."
self._to_delete.append(item)
if len(self._to_delete) >= self._batch_size:
self._bdel_go()

def _bput_go(self):
if self._defer_db:
defer(db.put, self._to_save)
else: db.put(self._to_save)
self._to_save = []

def _bdel_go(self):
if self._defer_db:
defer(db.delete, self._to_delete)
else: db.delete(self._to_delete)
self._to_delete = []

def _finish(self):
"When done iterating through models, could be that the last few remaining weren't put/deleted yet."
if self._to_save: self._bput_go()
if self._to_delete: self._bdel_go()

# FACADE SECTION >>>

def fetch(self, limit, offset=0):
return self._query.fetch(limit,offset)

def filter(self, property_operator, value):
self._check_query_type_is('Query')
self._query = self._query.filter(property_operator, value)
return self

def order(self, property):
self._check_query_type_is('Query')
self._query.order(property)
return self

def ancestor(self, ancestor):
self._check_query_type_is('Query')
self._query.ancestor(ancestor)
return self

def count(self, limit=1000):
return self._query.count(limit)

def _check_query_type_is(self, required_query_type):
if self._query_type != required_query_type:
raise TypeError('Operation not allowed for query type ('\
+ type(self._query).__name__)

最佳答案

您为什么不想使用 Mapreduce?它正是为这个用例而设计的,已经做了你想要的一切,并且可以通过编程方式调用。 “轻量级”是一个非常模糊的术语,但我不知道 mapreduce 库不完全适合您的任务的任何原因 - 而且几乎没有理由复制该功能。

关于python - 用于谷歌应用引擎 python 的迷你可迭代模型映射器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5737796/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com