gpt4 book ai didi

python - flask 中的后台任务

转载 作者:行者123 更新时间:2023-12-03 12:43:39 25 4
gpt4 key购买 nike

我正在编写一个 Web 应用程序,它会做一些繁重的工作。考虑到这一点,我想将任务设置为后台任务(非阻塞),这样其他请求就不会被之前的请求阻塞。
我将线程妖魔化,以便在主线程(因为我使用 threaded=True )完成后它不会退出,现在如果用户发送请求,我的代码将立即告诉他们他们的请求正在进行中,它'将在后台运行,并且应用程序已准备好为其他请求提供服务。
我当前的应用程序代码如下所示:

from flask import Flask
from flask import request
import threading

class threadClass:

def __init__(self):
thread = threading.Thread(target=self.run, args=())
thread.daemon = True # Daemonize thread
thread.start() # Start the execution

def run(self):

#
# This might take several minutes to complete
someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
try:
begin = threadClass()
except:
abort(500)

return "Task is in progress"

def main():
"""
Main entry point into program execution

PARAMETERS: none
"""
app.run(host='0.0.0.0',threaded=True)

main()
我只是希望它能够处理一些并发请求(它不会在生产中使用)
我可以做得更好吗?我错过了什么吗?我正在浏览python的多线程包并找到了这个

multiprocessing is a package that supports spawning processes using anAPI similar to the threading module. The multiprocessing packageoffers both local and remote concurrency, effectively side-steppingthe Global Interpreter Lock by using subprocesses instead of threads.Due to this, the multiprocessing module allows the programmer to fullyleverage multiple processors on a given machine. It runs on both Unixand Windows.


我可以使用多处理来妖魔化进程吗?我怎样才能比线程模块做得更好?
##编辑
我通过了python的多处理包,它类似于线程。
from flask import Flask
from flask import request
from multiprocessing import Process

class processClass:

def __init__(self):
p = Process(target=self.run, args=())
p.daemon = True # Daemonize it
p.start() # Start the execution

def run(self):

#
# This might take several minutes to complete
someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
try:
begin = processClass()
except:
abort(500)

return "Task is in progress"

def main():
"""
Main entry point into program execution

PARAMETERS: none
"""
app.run(host='0.0.0.0',threaded=True)

main()
上面的方法看起来不错吗?

最佳答案

最佳实践
在 flask 中实现后台任务的最佳方法是使用 Celery,如 this SO post 中所述。 .一个很好的起点是官方Flask documentationCelery documentation .
疯狂的方式:构建自己的装饰器
正如@MrLeeh 在评论中指出的那样,Miguel Grinberg 在他的 Pycon 2016 talk 中提出了一个解决方案。通过实现一个装饰器。我想强调的是,我对他的解决方案怀有最高的敬意;他自己称其为“疯狂的解决方案”。下面的代码是 his solution 的小改编.
警告!!!
不要在生产中使用它! 主要原因是这个应用有内存泄漏 通过使用全局 tasks字典。即使您解决了内存泄漏问题,维护这种代码也很困难。如果您只是想在私有(private)项目中玩耍或使用它,请继续阅读。
最小的例子
假设您的 /foo 中有一个长时间运行的函数调用。端点。我用 10 秒来模拟这个 sleep计时器。如果调用 enpoint 3 次,则需要 30 秒才能完成。
Miguel Grinbergs 装饰器解决方案在 flask_async 中实现.它在与当前 Flask 上下文相同的 Flask 上下文中运行一个新线程。每个线程都会发出一个新的 task_id .结果保存在全局字典中 tasks[task_id]['result'] .
使用装饰器后,您只需使用 @flask_async 装饰端点并且端点是异步的 - 就像那样!

import threading
import time
import uuid
from functools import wraps

from flask import Flask, current_app, request, abort
from werkzeug.exceptions import HTTPException, InternalServerError

app = Flask(__name__)
tasks = {}


def flask_async(f):
"""
This decorator transforms a sync route to asynchronous by running it in a background thread.
"""
@wraps(f)
def wrapped(*args, **kwargs):
def task(app, environ):
# Create a request context similar to that of the original request
with app.request_context(environ):
try:
# Run the route function and record the response
tasks[task_id]['result'] = f(*args, **kwargs)
except HTTPException as e:
tasks[task_id]['result'] = current_app.handle_http_exception(e)
except Exception as e:
# The function raised an exception, so we set a 500 error
tasks[task_id]['result'] = InternalServerError()
if current_app.debug:
# We want to find out if something happened so reraise
raise

# Assign an id to the asynchronous task
task_id = uuid.uuid4().hex

# Record the task, and then launch it
tasks[task_id] = {'task': threading.Thread(
target=task, args=(current_app._get_current_object(), request.environ))}
tasks[task_id]['task'].start()

# Return a 202 response, with an id that the client can use to obtain task status
return {'TaskId': task_id}, 202

return wrapped


@app.route('/foo')
@flask_async
def foo():
time.sleep(10)
return {'Result': True}


@app.route('/foo/<task_id>', methods=['GET'])
def foo_results(task_id):
"""
Return results of asynchronous task.
If this request returns a 202 status code, it means that task hasn't finished yet.
"""
task = tasks.get(task_id)
if task is None:
abort(404)
if 'result' not in task:
return {'TaskID': task_id}, 202
return task['result']


if __name__ == '__main__':
app.run(debug=True)
但是,您需要一个小技巧来获得结果。端点 /foo只会返回 HTTP 代码 202 和任务 ID,而不是结果。您需要另一个端点 /foo/<task_id>得到结果。这是本地主机的示例:
import time
import requests

task_ids = [requests.get('http://127.0.0.1:5000/foo').json().get('TaskId')
for _ in range(2)]
time.sleep(11)
results = [requests.get(f'http://127.0.0.1:5000/foo/{task_id}').json()
for task_id in task_ids]
# [{'Result': True}, {'Result': True}]

关于python - flask 中的后台任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40989671/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com