- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在向我的 Web 应用程序添加一个基于 Flask 的 API,以控制某些网络自动化功能的启动和停止。我遇到了一个奇怪的行为,即 Flask-Executor .submit()
方法调用的函数似乎无法从数据库中获取新的或更新的数据。
我知道这个问题非常复杂,所以感谢所有分享时间和意见的人。有关我的项目结构的概述,请参阅此问题的结尾。
flask-executor documentation说:
When calling
submit()
ormap()
Flask-Executor will wrap ThreadPoolExecutor callables with a copy of both the current application context and current request context
我不太完全理解上下文的含义,但我觉得这可能是一个很好的提示,说明为什么这应该或不应该起作用。 (顺便说一下,我正在使用 ThreadPoolExecutor)。我假设 db
SQLAlchemy 对象是应用程序上下文的一部分,因此 db
的副本应该在执行程序函数中可用。这似乎不是这种情况,因为我仍然必须在包含执行程序调用的函数的文件中import db
,正如您稍后将在本文中看到的那样。
我的前端有简单的开始和停止按钮,它们将 POST 发送到以下 API 路由:
file: app/api.py
from flask import request
from flask_login import login_required
from app import app, db, executor
from app.models import Project
from datetime import datetime
from automation.Staging import control
@app.route('/api/staging/control', methods=['POST'])
@login_required
def staging_control():
data = request.json
project_id = data['project-id']
action = data['staging-control']
project = Project.query.get(project_id)
sp = project.staging_profile
current_status = sp.status
if action == 'start':
if current_status == 'STARTED':
return {'response': 200, 'message': 'Job already running!'}
else:
sp.status = 'STARTED'
db.session.commit()
# The executor only spawns the thread if the task status was not already started.
executor.submit(control.start_staging, project_id)
elif action == 'stop':
if current_status == 'STARTED':
sp.status = 'STOPPED'
db.session.commit()
return {'response' : 200, 'message': 'OK'}
作业的状态存储在数据库模型中。如果 POSTed start
操作,则会更新数据库模型的状态列。同样,如果 stop
操作被发布,数据库模型的状态也会更新。
执行器对 control.start_staging
的函数调用生成一个线程,该线程开始一个无限循环,该循环执行一些工作然后休眠 X 秒。在每次循环开始时,我都试图检查数据库模型的状态列,以确定是否要中断循环并关闭线程。
启动线程工作得很好。数据库模型得到更新,执行程序生成线程,我的 while 循环开始。
从我的前端发送 stop
Action 也很好。数据库中的状态设置为 STOPPED
,我可以在我的数据库 shell 中通过手动查询看到这一点。
然而,最初由执行者启动的control.start_staging
函数仍然认为status
设置为STARTED
,尽管它实际上会在线程运行期间的某个时间更新为 STOPPED
。我试图从线程内部尽可能多地获取更新值。我看过this similar question .
这里是 control.start_staging
函数。我在下面的摘录中分享了一些我尝试获取更新状态的不同方法作为评论:
file: automation/Staging/control.py
from app import db
from app.models import Project, Staging_Profile
from app.config import STAGING_DURATION_MINS
from datetime import datetime, timedelta
from time import sleep
def start_staging(project_id):
project = Project.query.get(project_id)
print(f"Received START for project {project.project_name}")
sp = project.staging_profile
sp.last_activity = datetime.utcnow()
db.session.commit()
status = sp.status
# Staging Loop Start
while True:
# This just serves as a force-stop if the job runs for more than STAGING_DURATION_MINUTES minutes.
if sp.last_activity + timedelta(minutes=STAGING_DURATION_MINS) > datetime.utcnow():
print(f"Status is: {sp.status}")
# ATTEMPT 1: does not get updated data
# status = sp.status
# ATTEMPT 2: does not get updated data
# status = Staging_Profile.query.get(project.staging_profile_id).status
# ATTEMPT 3: does not get updated data
all_profiles = db.session.query(Staging_Profile).all()
this_profile = [profile for profile in all_profiles if profile.id == sp.id][0]
if this_profile.status == 'STOPPED':
print("Status is STOPPED. Returning")
break
else:
print(f"Status is {this_profile.status}")
# Do work
do_some_stuff()
else:
break
sleep(5)
return
现在,真正令人费解的是我可以从执行程序函数内部将数据写入数据库。 sp.last_activity = datetime.utcnow()
后跟 db.session.commit()
这行成功写入线程启动时的当前时间。
我以非常模块化的方式构建了这个应用程序,我觉得这也许就是问题的根源。
以下是我的应用程序结构相关部分的概述:
app/
├─ __init__.py # This is where my db & executor are instantiated
├─ api.py # This is where the /api/staging/control route lives
├─ models.py # This holds my SQLAlchemy DB classes
├─ routes.py # This holds my regular front-end routes
├─ config.py # General config parameters
automation/
├─ Staging/
│ ├─ control.py # This is where the function passed to the executor is defined
│ ├─ __init__.py # Empty
├─ __init__.py # Empty
再次感谢。当我找到这个问题时,我会发布解决方案或解决方法。
最佳答案
使用更新
Project.query.filter_by(id=project_id).update({
'last_activity': datetime.utcnow()
})
关于python - Flask-Executor 和 Flask-SQLAlchemy : Can't get updated data from DB inside of executor function,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66896139/
Issue 2019/05/09 21:50:07.380 +0800 ERROR [ExecutorManager] [Azkaban] No active executors found
我的问题是:使用 Executors.newFixedThreadPool(1)?? 有意义吗? 。在两个线程(main + oneAnotherThread)场景下使用执行器服务是否高效?正在通过调
我想知道,Executors.newSingleThreadExecutor() 之间有什么区别?和 Executors.newFixedThreadPool(1) 以下摘自javadoc Unlik
我的问题是:使用 Executors.newFixedThreadPool(1) 有意义吗??。在两个线程(main + oneAnotherThread)场景中使用执行器服务是否有效?通过调用 ne
我有一个 Apache Spark 应用程序在集群模式下运行在 YARN 集群上(spark 在这个集群上有 3 个节点)。 当应用程序运行时,Spark-UI 显示 2 个执行程序(每个运行在不同的
我想知道是否有任何理由使用 Executor 而不是 ExecutorService。 据我所知,JDK 中没有实现 Executor 接口(interface),它也不是 ExecutorServi
我有多个使用 Celery Executor 的 dag,但我希望使用 Kubernetes Executor 运行一个特定的 dag。我无法推断出一种良好而可靠的方法来实现这一目标。 我有一个 ai
假设我们的 Controller 中有一个 Action 。在每次请求时,许多用户都会调用 performLogin。 def performLogin( ) = { Async {
创建和管理您自己的 ExecutorService 与使用 Spring Boot 的 @Async 方法和 @Bean 方法创建 Executor 添加一个@Bean来创建一个Executor 手动
问题从无到有,只有我在代码中所做的更改 - 安装了 RaSharper(但删除它并重新安装 Visual Studio 没有帮助)。 所以我使用 NUnit 3 来运行测试。 我有 Visual St
我们知道每个任务当时都在一个核心中执行。 假设我们有这样配置的节点集群: 10 节点。 每个节点 16 个核心。 每个节点 64 GB 内存。 我的问题是 有 1 个 16 核的执行程序和 16 个
我正在从 Jupyter Notebook 中初始化 PySpark,如下所示: from pyspark import SparkContext # conf = SparkConf().setAp
我正在向我的 Web 应用程序添加一个基于 Flask 的 API,以控制某些网络自动化功能的启动和停止。我遇到了一个奇怪的行为,即 Flask-Executor .submit() 方法调用的函数似
单元测试在本地运行良好。 在 Visual Studio 2017 托管生成代理上运行时,VSTest 任务失败并显示: 2018-12-08T10:42:16.3779907Z An excepti
我正在尝试制作一个执行器和线程的简单示例。 当我调用 newSingleThreadExecutor(new CustomThreadFactory) 时,一切顺利,但是当我使用 null 参数调用
对于一个线程,我通过以下代码段捕获未捕获的异常。但是,对于 ExecutorService executor = Executors.newFixedThreadPool(10);,如何捕获未捕获的异
我想创建一个 CompletableFuture,其返回值在 Kotlin 中的特定执行程序上运行。 下面的代码工作得很好。 return CompletableFuture.supplyAsync
考虑基本的固定线程池: Executors.newFixedThreadPool(MaxListeners) 我打算不断提交新任务 - 响应传入的 TCP 套接字服务请求。 然而,当每个任务中的Run
我们可以在定义 ThreadPoolExecutors 时提供 BlockingQueue 实现。但是,如果我使用工厂(执行器)创建单个线程池(如下所示),我想知道使用哪个阻塞队列。我猜它是一个 Li
我编写了一个程序来执行两个在 shell 前台运行的命令,直到在终端上按下 ^c。 外壳命令 ./weed master -mdir=/var/lib/qualebs/weed 上面命令的输出是 qu
我是一名优秀的程序员,十分优秀!