gpt4 book ai didi

python - 在 Flask 中运行后台 Celery 任务

转载 作者:行者123 更新时间:2023-12-01 05:38:58 28 4
gpt4 key购买 nike

问题已更新,包括取得的进展

我有以下代码,我的 celery 任务启动正常,我只是不知道应该在哪里存储异步结果,以便稍后再查看

#!/usr/bin/env python

"""Page views."""

from flask import render_template, request
from flask import Flask

from celerytest import add

from time import sleep


app = Flask(__name__)

async_res = []

@app.route('/', methods=['GET', 'POST'])
def run():
if request.method == 'GET':
return render_template("template.html")
else:
form = request.form
n1 = str(form.get("n1"))
n2 = str(form.get("n2"))
aysnc_res = add.delay(n1,n2)
return render_template("loading.html")

@app.route('/loading')
def check_if_complete():
if async_res.ready() == True:
return render_template("template2.html", val=async_res.get())
else:
sleep(5)
return render_template("loading.html")


if __name__ == '__main__':
app.run()

看来,将 async_res 存储为我的 Flask 应用程序中的全局变量会导致服务器错误。那么存储结果的最佳方法是什么,以便我可以在“加载页面”中检查它

最佳答案

我最终能够将任务 ID 保存在 Flask 的 session 字典中

参见下面的代码:

#!/usr/bin/env python

"""Page views."""

from flask import render_template, request
from flask import Flask

from celerytest import add

from time import sleep


app = Flask(__name__)

@app.route('/', methods=['GET', 'POST'])
def run():
if request.method == 'GET':
return render_template("template.html")
else:
form = request.form
n1 = str(form.get("n1"))
n2 = str(form.get("n2"))
aysnc_res = add.delay(n1,n2)
session['TASK_ID'] = async_res.id
return render_template("loading.html")

@app.route('/loading')
def check_if_complete():
aysnc_res = session['TASK_ID']
if async_res.ready() == True:
return render_template("template2.html", val=async_res.get())
else:
sleep(5)
return render_template("loading.html")


if __name__ == '__main__':
app.run()

关于python - 在 Flask 中运行后台 Celery 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18132704/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com