- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用task_prerun和 task_postrun发出信号来跟踪特定工作人员当时实际执行的任务数量。
每次任务进入时,我都会将文件中的整数加一。当任务离开时,我将其减少一个单位。
我将这些值写入文件。这意味着当两个任务在同一个工作线程下同时启动并且 task_prerrun
信号同时触发并访问同一个文件时,我必须考虑竞争条件。
我将如何处理这个问题? 我可以在全局范围内拥有一个threading.Lock
对象吗?这个锁必须在每个工作人员的基础上工作,所以我想声明它是可以的尽管不是很好的做法,但在全局范围内都是如此。
我不想想要获取正在处理的任务总数,我想获取该工作人员正在处理的任务数量强>.
The reason why is to protect the instances from being removed when the autoscaling group minimum size changes in an AWS stack... I don't want AWS to kill machines that are still processing tasks.
考虑以下示例:
import os
import time
from celery import Celery
from celery.signals import task_prerun, task_postrun
app = Celery('tasks', broker='pyamqp://guest@localhost/')
# Text file that keeps track of how many tasks are still computing.
counter_file = os.path.join(os.path.dirname(__file__), 'counter.txt')
if not os.path.exists(counter_file):
with open(counter_file, 'w') as f:
f.write('0')
@task_prerun.connect
def before(*args, **kwargs):
""" Open the counter file and increment the value in it. """
with open(counter_file, 'r+') as f:
count = int(f.read())
f.seek(0)
f.write(str(count + 1))
@task_postrun.connect
def after(*args, **kwargs):
""" Open the counter file and decrement the value in it. """
with open(counter_file, 'r+') as f:
count = int(f.read())
f.seek(0)
f.write(str(count - 1))
@app.task
def add(x, y):
time.sleep(5)
return x + y
我考虑了 @DejanLekic 提出的使用 Inspect 类的解决方案,结果成功了。这是最终的脚本,我使用两台机器将其加载到 celery 中:
# tasks.py
import os
import random
import socket
import threading
import time
from celery import Celery
from celery.signals import task_prerun, task_postrun
app = Celery('tasks', broker=os.getenv('BROKER_ADDR', 'pyamqp://guest@localhost//'))
def get_number_of_tasks_being_executed_by_this_worker(wait_before=0.01):
time.sleep(wait_before)
# Do not rely on the worker name, because we are just sure of the hostname, so we
# cannot use the detination= keyword of the inspect call.
active_tasks_by_all_workers = app.control.inspect().active()
# Filter the tasks of the workers on this machine.
active_tasks_by_this_worker = [
val for key, val in active_tasks_by_all_workers.items()
if socket.gethostname() in key
]
# Get the list of tasks of the first (and only, ideally) match.
active_tasks_by_this_worker = active_tasks_by_this_worker[0] if active_tasks_by_this_worker else []
return active_tasks_by_this_worker
def check_if_should_protect_from_autoscaling():
tasks = get_number_of_tasks_being_executed_by_this_worker()
if tasks:
print("%d tasks are still running in this worker. Ensure protection is set." % len(tasks))
# if is_not_protected_against_auto_scaling_group:
# set_aws_autoscaling_protection()
else:
print("This worker is not executing any tasks. Unsetting protection.")
# unset_aws_autoscaling_protection()
@task_postrun.connect
def after(*args, **kwargs):
# Get the number of tasks with a little delay (0.01 seconds suffice), otherwise at
# this point the current task that executed this method is shown as active.
threading.Thread(target=check_if_should_protect_from_autoscaling).start()
@app.task
def add(x, y):
time.sleep(3 * random.random())
return x + y
我正在从此脚本发送许多任务:
# dispatcher.py
import asyncio
from tasks import add
async def task():
add.delay(3, 4)
async def main():
await asyncio.gather(*[task() for i in range(200)])
if __name__ == '__main__':
asyncio.run(main())
输出日志似乎证实了预期的行为:
[2019-09-23 07:50:28,507: WARNING/ForkPoolWorker-3] 10 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,625: WARNING/ForkPoolWorker-1] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,627: WARNING/ForkPoolWorker-7] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,993: WARNING/ForkPoolWorker-4] 7 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,027: INFO/ForkPoolWorker-2] Task tasks.add[c3af9378-5666-42c3-9a37-5d0720b2065a] succeeded in 1.6377690890221857s: 7
[2019-09-23 07:50:29,204: INFO/ForkPoolWorker-9] Task tasks.add[9ca176ce-1590-4670-9947-4656166d224d] succeeded in 2.7913955969852395s: 7
[2019-09-23 07:50:29,224: INFO/ForkPoolWorker-5] Task tasks.add[38d005bc-ff13-4514-aba0-8601e79e67c8] succeeded in 2.0496858750120737s: 7
[2019-09-23 07:50:29,311: WARNING/ForkPoolWorker-8] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,316: WARNING/ForkPoolWorker-6] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,510: WARNING/ForkPoolWorker-10] 4 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,059: WARNING/ForkPoolWorker-2] 3 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,199: INFO/ForkPoolWorker-3] Task tasks.add[991d984a-4434-47a0-8c98-9508ca980f0b] succeeded in 2.7176807850482874s: 7
[2019-09-23 07:50:30,239: WARNING/ForkPoolWorker-9] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,250: WARNING/ForkPoolWorker-5] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:31,226: WARNING/ForkPoolWorker-3] This worker is not executing any tasks. Unsetting protection.
一切都好! :D
最佳答案
我们通过使用一些现成的 Celery 功能实现了 Celery 自动缩放(在 AWS 上)。对于您的要求,我们使用 Celery 的控制 API ( https://docs.celeryproject.org/en/latest/reference/celery.app.control.html )。关键是其中的检查部分。 Inspect 类可以采用 destination 参数,这是您要检查的 Celery 节点。我们不使用它,我们想要检查集群中的所有节点,但也许您可能需要以不同的方式进行操作。您应该熟悉此类及其 .active()
方法,该方法将为您提供一组工作人员或整个集群中的事件任务列表(如果未提供目标)。
关于python - 如何实现每个工作线程同步以避免与 Celery 上的信号连接的方法出现竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57998807/
我想了解 Ruby 方法 methods() 是如何工作的。 我尝试使用“ruby 方法”在 Google 上搜索,但这不是我需要的。 我也看过 ruby-doc.org,但我没有找到这种方法。
Test 方法 对指定的字符串执行一个正则表达式搜索,并返回一个 Boolean 值指示是否找到匹配的模式。 object.Test(string) 参数 object 必选项。总是一个
Replace 方法 替换在正则表达式查找中找到的文本。 object.Replace(string1, string2) 参数 object 必选项。总是一个 RegExp 对象的名称。
Raise 方法 生成运行时错误 object.Raise(number, source, description, helpfile, helpcontext) 参数 object 应为
Execute 方法 对指定的字符串执行正则表达式搜索。 object.Execute(string) 参数 object 必选项。总是一个 RegExp 对象的名称。 string
Clear 方法 清除 Err 对象的所有属性设置。 object.Clear object 应为 Err 对象的名称。 说明 在错误处理后,使用 Clear 显式地清除 Err 对象。此
CopyFile 方法 将一个或多个文件从某位置复制到另一位置。 object.CopyFile source, destination[, overwrite] 参数 object 必选
Copy 方法 将指定的文件或文件夹从某位置复制到另一位置。 object.Copy destination[, overwrite] 参数 object 必选项。应为 File 或 F
Close 方法 关闭打开的 TextStream 文件。 object.Close object 应为 TextStream 对象的名称。 说明 下面例子举例说明如何使用 Close 方
BuildPath 方法 向现有路径后添加名称。 object.BuildPath(path, name) 参数 object 必选项。应为 FileSystemObject 对象的名称
GetFolder 方法 返回与指定的路径中某文件夹相应的 Folder 对象。 object.GetFolder(folderspec) 参数 object 必选项。应为 FileSy
GetFileName 方法 返回指定路径(不是指定驱动器路径部分)的最后一个文件或文件夹。 object.GetFileName(pathspec) 参数 object 必选项。应为
GetFile 方法 返回与指定路径中某文件相应的 File 对象。 object.GetFile(filespec) 参数 object 必选项。应为 FileSystemObject
GetExtensionName 方法 返回字符串,该字符串包含路径最后一个组成部分的扩展名。 object.GetExtensionName(path) 参数 object 必选项。应
GetDriveName 方法 返回包含指定路径中驱动器名的字符串。 object.GetDriveName(path) 参数 object 必选项。应为 FileSystemObjec
GetDrive 方法 返回与指定的路径中驱动器相对应的 Drive 对象。 object.GetDrive drivespec 参数 object 必选项。应为 FileSystemO
GetBaseName 方法 返回字符串,其中包含文件的基本名 (不带扩展名), 或者提供的路径说明中的文件夹。 object.GetBaseName(path) 参数 object 必
GetAbsolutePathName 方法 从提供的指定路径中返回完整且含义明确的路径。 object.GetAbsolutePathName(pathspec) 参数 object
FolderExists 方法 如果指定的文件夹存在,则返回 True;否则返回 False。 object.FolderExists(folderspec) 参数 object 必选项
FileExists 方法 如果指定的文件存在返回 True;否则返回 False。 object.FileExists(filespec) 参数 object 必选项。应为 FileS
我是一名优秀的程序员,十分优秀!