gpt4 book ai didi

python - Flask celery 应用程序与普通 Flask 应用程序(如何使它们同时工作)

转载 作者:行者123 更新时间:2023-12-03 17:00:05 25 4
gpt4 key购买 nike

我有一个 Flask Celery 应用程序,它实例化了 celery 实例。
我知道从 .py 文件的角度来看,我可以将普通的 Flask 路由添加到同一个 .py 文件,并且我需要运行相同的代码两次:

  • 运行 worker :

    % celery worker -A app.celery ...
  • 运行与普通 Flask 应用程序相同的代码:

    % python app.py ...

  • 我的问题是:如果普通的 Flask 应用程序是真正独立于 Celery 应用程序的进程,那么我如何从 Flask 路由操作正在运行的 celery 实例来执行以下操作:
      celery.control.purge()
    celery.control.inspect() etc ???

    这是我的代码:
    import os
    import random
    import time
    from flask import Flask, request, render_template, session, flash, redirect, \
    url_for, jsonify
    from celery import Celery

    app = Flask(__name__)

    # Celery configuration
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

    # Initialize Celery
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)


    @celery.task
    def send_async_email(msg):
    """Background task to send an email with Flask-Mail."""
    with app.app_context():
    mail.send(msg)


    @app.route('/purge', methods=['GET', 'POST'])
    def purge_tasks():
    ## want to do stuffs with the running celery instance, e.g:
    ## doing:
    ## celery.control.purge()
    ## celery.control.inspect()
    ##
    ## BUT HOW??

    if __name__ == '__main__':
    app.run(debug=True)

    我一直在互联网上寻找答案,但没有一个答案专门回答这个问题。

    非常感谢您的任何帮助/指点。

    最佳答案

    这个,http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application ,看起来它会回答你的问题。阅读应用程序,然后调用任务。您需要做的是将您的 Flask 应用程序保存为文档中描述的“任务”,然后将该任务传递给 Celery 实例,顺便也称为“应用程序”。

    关于python - Flask celery 应用程序与普通 Flask 应用程序(如何使它们同时工作),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42167808/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com