gpt4 book ai didi

python - 在 Flask 应用程序中获取 Celery Group 结果

转载 作者:行者123 更新时间:2023-12-03 17:02:26 25 4
gpt4 key购买 nike

我正在使用 Flask,设置了一个简单的应用程序,并且正在尝试学习如何有效地利用 celery 来完成工作,并在查询时显示结果。

在最基本的例子中,我创建了一个任务,通过 task.delay(args) .然后,这个对象让我为作业提取一个 ID,稍后我可以通过点击不同的端点来查询它。简单的。

我的目标是模仿这一点,尽管利用群体。阅读文档,我发现组原语是懒惰的,所以我必须在保存它之前实际调用它。

我的问题肯定来自缺乏理解,但基本上是:
如果我的目标是能够通过 Flask Pipeline 异步并行地运行多组后台任务,那么我该如何检索 .join()给定以下限制的组的结果

  • 请求 Endpoint1(可能返回一个 ID)
  • 请求 Endpotin2,传递 ID 以返回完成工作的结果

  • 这是正确的方法吗?还是我应该考虑不同的心态?

    伪代码:
    # From my apps init
    celery_instance = Celery("module.modulename", backend = app.config['CELERY_RESULT_BACKEND'], broker = app.config['CELERY_BROKER_URL'])
    celery_instance.conf.update(app.config)

    <snip>

    from celery import group
    from app import celery_instance

    @app.route("/status/domain/<id>", methods=['GET'])
    def query(id):
    # Works for single job, not job group
    result = celery_instance.AsyncResult(id)
    ...

    @app.route("/query/domain/<domain>", methods=['GET'])
    def query_by_domain(domain):
    ...
    job = group([task1.delay(domain), task2.delay(domain)])
    return redirect(url_for('app.query', id=job.id), code=302)

    最佳答案

    您需要将组结果保存为通过 GroupResult.save()方法:

    @app.route("/query/domain/<domain>", methods=['GET'])
    def query_by_domain(domain):
    ...
    job = group([task1.delay(domain), task2.delay(domain)])
    <b>job.save()</b>
    return redirect(url_for('app.query', id=job.id), code=302)

    然后你可以得到 GroupResult如通过 GroupResult.restore()方法。
    @app.route("/status/domain/<id>", methods=['GET'])
    def query(id):
    # Works for single job, not job group
    <b>result = celery_instance.GroupResult.restore(group_id)</b>
    ...

    关于python - 在 Flask 应用程序中获取 Celery Group 结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33611235/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com