gpt4 book ai didi

python - Flask-Executor 和 Flask-SQLAlchemy : Can't get updated data from DB inside of executor function

转载 作者:行者123 更新时间:2023-12-05 06:48:09 27 4
gpt4 key购买 nike

我正在向我的 Web 应用程序添加一个基于 Flask 的 API,以控制某些网络自动化功能的启动和停止。我遇到了一个奇怪的行为,即 Flask-Executor .submit() 方法调用的函数似乎无法从数据库中获取新的或更新的数据。

我知道这个问题非常复杂,所以感谢所有分享时间和意见的人。有关我的项目结构的概述,请参阅此问题的结尾。

flask-executor documentation说:

When calling submit() or map() Flask-Executor will wrap ThreadPoolExecutor callables with a copy of both the current application context and current request context

我不太完全理解上下文的含义,但我觉得这可能是一个很好的提示,说明为什么这应该或不应该起作用。 (顺便说一下,我正在使用 ThreadPoolExecutor)。我假设 db SQLAlchemy 对象是应用程序上下文的一部分,因此 db 的副本应该在执行程序函数中可用。这似乎不是这种情况,因为我仍然必须在包含执行程序调用的函数的文件中import db,正如您稍后将在本文中看到的那样。

我的前端有简单的开始和停止按钮,它们将 POST 发送到以下 API 路由:

file: app/api.py

from flask import request
from flask_login import login_required
from app import app, db, executor
from app.models import Project
from datetime import datetime
from automation.Staging import control

@app.route('/api/staging/control', methods=['POST'])
@login_required
def staging_control():
data = request.json
project_id = data['project-id']
action = data['staging-control']

project = Project.query.get(project_id)
sp = project.staging_profile
current_status = sp.status

if action == 'start':
if current_status == 'STARTED':
return {'response': 200, 'message': 'Job already running!'}
else:
sp.status = 'STARTED'
db.session.commit()
# The executor only spawns the thread if the task status was not already started.
executor.submit(control.start_staging, project_id)


elif action == 'stop':
if current_status == 'STARTED':
sp.status = 'STOPPED'
db.session.commit()

return {'response' : 200, 'message': 'OK'}

背景

作业的状态存储在数据库模型中。如果 POSTed start 操作,则会更新数据库模型的状态列。同样,如果 stop 操作被发布,数据库模型的状态也会更新。

执行器对 control.start_staging 的函数调用生成一个线程,该线程开始一个无限循环,该循环执行一些工作然后休眠 X 秒。在每次循环开始时,我都试图检查数据库模型的状态列,以确定是否要中断循环并关闭线程。

启动线程工作得很好。数据库模型得到更新,执行程序生成线程,我的 while 循环开始。

从我的前端发送 stop Action 也很好。数据库中的状态设置为 STOPPED,我可以在我的数据库 shell 中通过手动查询看到这一点。

然而,最初由执行者启动的control.start_staging函数仍然认为status设置为STARTED,尽管它实际上会在线程运行期间的某个时间更新为 STOPPED。我试图从线程内部尽可能多地获取更新值。我看过this similar question .

这里是 control.start_staging 函数。我在下面的摘录中分享了一些我尝试获取更新状态的不同方法作为评论:

file: automation/Staging/control.py

from app import db
from app.models import Project, Staging_Profile
from app.config import STAGING_DURATION_MINS
from datetime import datetime, timedelta
from time import sleep


def start_staging(project_id):
project = Project.query.get(project_id)
print(f"Received START for project {project.project_name}")
sp = project.staging_profile
sp.last_activity = datetime.utcnow()
db.session.commit()
status = sp.status

# Staging Loop Start
while True:

# This just serves as a force-stop if the job runs for more than STAGING_DURATION_MINUTES minutes.
if sp.last_activity + timedelta(minutes=STAGING_DURATION_MINS) > datetime.utcnow():
print(f"Status is: {sp.status}")

# ATTEMPT 1: does not get updated data
# status = sp.status

# ATTEMPT 2: does not get updated data
# status = Staging_Profile.query.get(project.staging_profile_id).status

# ATTEMPT 3: does not get updated data
all_profiles = db.session.query(Staging_Profile).all()
this_profile = [profile for profile in all_profiles if profile.id == sp.id][0]

if this_profile.status == 'STOPPED':
print("Status is STOPPED. Returning")
break

else:
print(f"Status is {this_profile.status}")

# Do work
do_some_stuff()

else:
break
sleep(5)

return

现在,真正令人费解的是我可以从执行程序函数内部将数据写入数据库。 sp.last_activity = datetime.utcnow() 后跟 db.session.commit() 这行成功写入线程启动时的当前时间。

我的怀疑

我以非常模块化的方式构建了这个应用程序,我觉得这也许就是问题的根源。

以下是我的应用程序结构相关部分的概述:

app/
├─ __init__.py # This is where my db & executor are instantiated
├─ api.py # This is where the /api/staging/control route lives
├─ models.py # This holds my SQLAlchemy DB classes
├─ routes.py # This holds my regular front-end routes
├─ config.py # General config parameters

automation/
├─ Staging/
│ ├─ control.py # This is where the function passed to the executor is defined
│ ├─ __init__.py # Empty
├─ __init__.py # Empty

再次感谢。当我找到这个问题时,我会发布解决方案或解决方法。

最佳答案

使用更新

Project.query.filter_by(id=project_id).update({
'last_activity': datetime.utcnow()
})

关于python - Flask-Executor 和 Flask-SQLAlchemy : Can't get updated data from DB inside of executor function,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66896139/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com