gpt4 book ai didi

google-app-engine - 从数据存储中查询大量 ndb 实体的最佳实践

转载 作者:太空宇宙 更新时间:2023-11-03 15:17:22 24 4
gpt4 key购买 nike

我在 App Engine 数据存储区遇到了一个有趣的限制。我正在创建一个处理程序来帮助我们分析其中一台生产服务器上的一些使用数据。为了执行分析,我需要查询和汇总从数据存储中提取的 10,000 多个实体。计算并不难,它只是通过使用样本的特定过滤器的项目的直方图。我遇到的问题是,在达到查询截止日期之前,我无法以足够快的速度从数据存储中取回数据以进行任何处理。

我已经尝试了所有我能想到的将查询分块到并行 RPC 调用中以提高性能的方法,但根据 appstats 我似乎无法让查询实际并行执行。无论我尝试什么方法(见下文),RPC 似乎总是退回到一个连续的下一个查询的瀑布。

注意:查询和分析代码确实有效,它只是运行缓慢,因为我无法从数据存储中足够快地获取数据。

背景

我没有可以共享的实时版本,但这是我正在谈论的系统部分的基本模型:

class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)

您可以将示例视为用户使用给定名称的功能的时间。 (例如:'systemA.feature_x')。标签基于客户详细信息、系统信息和功能。例如:['winxp'、'2.5.1'、'systemA'、'feature_x'、'premium_account'])。因此,标签形成了一组非规范化的标记,可用于查找感兴趣的样本。

我正在尝试进行的分析包括获取日期范围并询问每个客户帐户(公司,而不是每个用户)每天(或每小时)使用的一组功能(可能是所有功能)的功能有多少次。

所以处理程序的输入是这样的:
  • 开始日期
  • 结束日期
  • 标签

  • 输出将是:
    [{
    'company_account': <string>,
    'counts': [
    {'timeperiod': <iso8601 date>, 'count': <int>}, ...
    ]
    }, ...
    ]

    查询的通用代码

    以下是所有查询的一些通用代码。处理程序的一般结构是一个使用 webapp2 的简单获取处理程序,它设置查询参数、运行查询、处理结果、创建要返回的数据。
    # -- Build Query Object --- #
    query_opts = {}
    query_opts['batch_size'] = 500 # Bring in large groups of entities

    q = Sample.query()
    q = q.order(Sample.timestamp)

    # Tags
    tag_args = [(Sample.tags == t) for t in tags]
    q = q.filter(ndb.query.AND(*tag_args))

    def handle_sample(sample):
    session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
    count_key = session_obj.data['customer']
    addCountForPeriod(count_key, sample.timestamp)

    尝试过的方法

    我尝试了多种方法来尝试尽快并行地从数据存储中提取数据。到目前为止,我尝试过的方法包括:

    A. 单次迭代

    这更像是一个简单的基本案例,可以与其他方法进行比较。我只是构建查询并迭代所有项目,让 ndb 做它所做的事情,将它们一个接一个地拉出来。
    q = q.filter(Sample.timestamp >= start_time)
    q = q.filter(Sample.timestamp <= end_time)
    q_iter = q.iter(**query_opts)

    for sample in q_iter:
    handle_sample(sample)

    B. 大型提取

    这里的想法是看看我是否可以进行一次非常大的提取。
    q = q.filter(Sample.timestamp >= start_time)
    q = q.filter(Sample.timestamp <= end_time)
    samples = q.fetch(20000, **query_opts)

    for sample in samples:
    handle_sample(sample)

    C. 跨时间范围异步获取

    这里的想法是认识到样本在时间上的间隔相当好,所以我可以创建一组独立的查询,将整个时间区域分成块,并尝试使用异步并行运行每个查询:
    # split up timestamp space into 20 equal parts and async query each of them
    ts_delta = (end_time - start_time) / 20
    cur_start_time = start_time
    q_futures = []

    for x in range(ts_intervals):
    cur_end_time = (cur_start_time + ts_delta)
    if x == (ts_intervals-1): # Last one has to cover full range
    cur_end_time = end_time

    f = q.filter(Sample.timestamp >= cur_start_time,
    Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
    q_futures.append(f)
    cur_start_time = cur_end_time

    # Now loop through and collect results
    for f in q_futures:
    samples = f.get_result()
    for sample in samples:
    handle_sample(sample)

    D. 异步映射

    我尝试了这种方法,因为文档使它听起来像 ndb 在使用 Query.map_async 方法时可能会自动利用一些并行性。
    q = q.filter(Sample.timestamp >= start_time)
    q = q.filter(Sample.timestamp <= end_time)

    @ndb.tasklet
    def process_sample(sample):
    period_ts = getPeriodTimestamp(sample.timestamp)
    session_obj = yield sample.session.get_async() # Lookup the session object from cache
    count_key = session_obj.data['customer']
    addCountForPeriod(count_key, sample.timestamp)
    raise ndb.Return(None)

    q_future = q.map_async(process_sample, **query_opts)
    res = q_future.get_result()

    结果

    我测试了一个示例查询来收集总体响应时间和 appstats 跟踪。结果是:

    A. 单次迭代

    真实:15.645s

    这个顺序一个接一个地获取批次,然后从内存缓存中检索每个 session 。

    Method A appstats

    B. 大型提取

    真实:12.12s

    实际上与选项 A 相同,但由于某种原因要快一些。

    Method B appstats

    C. 跨时间范围异步获取

    真实:15.251s

    似乎在开始时提供了更多的并行性,但似乎在结果迭代期间通过对 next 的一系列调用而减慢了速度。似乎也无法将 session 内存缓存查找与挂起的查询重叠。

    Method C appstats

    D. 异步映射

    真实:13.752s

    这个对我来说是最难理解的。看起来它有很多重叠,但一切似乎都像瀑布一样拉伸(stretch)而不是平行。

    Method D appstats

    建议

    基于这一切,我错过了什么?我只是在 App Engine 上达到了限制,还是有更好的方法来并行下拉大量实体?

    我不知道接下来要尝试什么。我想过重写客户端以并行向应用程序引擎发出多个请求,但这似乎非常暴力。我真的希望应用引擎应该能够处理这个用例,所以我猜我遗漏了一些东西。

    更新

    最后我发现选项 C 最适合我的情况。我能够优化它以在 6.1 秒内完成。仍然不完美,但好多了。

    在听取了几个人的建议后,我发现以下几点是理解和记住的关键:
  • 多个查询可以并行运行
  • 一次只能运行 10 个 RPC
  • 尝试非规范化到没有辅助查询的程度
  • 这种类型的任务最好留给映射reduce和任务队列,而不是实时查询

  • 所以我做了什么让它更快:
  • 我根据时间从一开始就对查询空间进行了分区。 (注意:分区在返回的实体方面越相等越好)
  • 我进一步对数据进行了非规范化处理以消除对辅助 session 查询的需要
  • 我利用 ndb 异步操作和 wait_any() 将查询与处理重叠

  • 我仍然没有获得我期望或喜欢的性能,但现在它是可行的。我只是希望它们是一种更好的方法,可以在处理程序中将大量顺序实体快速拉入内存。

    最佳答案

    像这样的大型处理不应该在用户请求中完成,它有 60 秒的时间限制。相反,它应该在支持长时间运行的请求的上下文中完成。 task queue支持长达 10 分钟的请求,以及(我相信)正常的内存限制(F1 实例,默认情况下,有 128MB of memory )。对于更高的限制(无请求超时,1GB+ 内存),使用 backends .

    这里有一些尝试:设置一个 URL,在访问时触发任务队列任务。它返回一个网页,每约 5 秒轮询一次到另一个 URL,如果任务队列任务尚未完成,则该 URL 以 true/false 响应。任务队列处理数据,这可能需要 10 秒的时间,并将结果作为计算数据或呈现的网页保存到数据存储中。一旦初始页面检测到它已完成,用户将被重定向到该页面,该页面从数据存储中获取现在计算的结果。

    关于google-app-engine - 从数据存储中查询大量 ndb 实体的最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11509368/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com