- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有 celery beat 和 celery(四个 worker )批量做一些加工步骤。其中一项任务大致是这样的:“对于每个尚未创建 Y 的 X,创建一个 Y。”
任务以半快速(10 秒)的速度定期运行。任务完成得非常快。还有其他任务正在进行中。
我已经多次遇到节拍任务明显积压的问题,因此同一任务(来自不同的节拍时间)同时执行,导致错误地重复工作。任务似乎也是乱序执行的。
是否可以限制 celery beat 以确保一次只有一个未完成的任务实例?在任务上设置类似 rate_limit=5
的设置是否是执行此操作的“正确”方法?
是否可以确保节拍任务按顺序执行,例如beat 不是分派(dispatch)任务,而是将其添加到任务链中?
除了使这些任务本身以原子方式执行并且可以安全地并发执行之外,处理此问题的最佳方法是什么?这不是我所期望的节拍任务的限制……
任务本身的定义很简单:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
这是一个实际的(清理过的)日志:
[00:00.000]
foocorp.tasks.add_y_to_xs 已发送。 id->#1[00:00.001]
收到任务:foocorp.tasks.add_y_to_xs[#1][00:10.009]
foocorp.tasks.add_y_to_xs 已发送。 id->#2[00:20.024]
foocorp.tasks.add_y_to_xs 已发送。 id->#3[00:26.747]
收到任务:foocorp.tasks.add_y_to_xs[#2][00:26.748]
任务池:应用 #2[00:26.752]
收到任务:foocorp.tasks.add_y_to_xs[#3][00:26.769]
接受的任务:foocorp.tasks.add_y_to_xs[#2] pid:26528[00:26.775]
任务 foocorp.tasks.add_y_to_xs[#2] 在 0.0197986490093 秒内成功:无[00:26.806]
任务池:应用 #1[00:26.836]
任务池:应用 #3[01:30.020]
接受的任务:foocorp.tasks.add_y_to_xs[#1] pid:26526[01:30.053]
接受的任务:foocorp.tasks.add_y_to_xs[#3] pid:26529[01:30.055]
foocorp.tasks.add_y_to_xs[#1]:为 X id #9725 添加 Y[01:30.070]
foocorp.tasks.add_y_to_xs[#3]:为 X id #9725 添加 Y[01:30.074]
任务 foocorp.tasks.add_y_to_xs[#1] 在 0.0594762689434 秒内成功:无[01:30.087]
任务 foocorp.tasks.add_y_to_xs[#3] 在 0.0352867960464 秒内成功:无我们目前使用 Celery 3.1.4 和 RabbitMQ 作为传输。
编辑 Dan,这是我的想法:
Dan,这是我最终使用的:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
以及 celery 任务装饰器(仅用于周期性任务):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task
最佳答案
from functools import wraps
from celery import shared_task
def skip_if_running(f):
task_name = f'{f.__module__}.{f.__name__}'
@wraps(f)
def wrapped(self, *args, **kwargs):
workers = self.app.control.inspect().active()
for worker, tasks in workers.items():
for task in tasks:
if (task_name == task['name'] and
tuple(args) == tuple(task['args']) and
kwargs == task['kwargs'] and
self.request.id != task['id']):
print(f'task {task_name} ({args}, {kwargs}) is running on {worker}, skipping')
return None
return f(self, *args, **kwargs)
return wrapped
@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
pass
test_single_task.delay()
关于python - celery 节拍 : Limit to single task instance at a time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20894771/
我知道我的问题有点含糊,但我不知道如何描述它。我问过很多地方,但似乎没有人理解我为什么要这样做。但请耐心等待,我会解释为什么我想要这样的东西。 我使用 Liquid Templates 允许用户在我的
这个问题在这里已经有了答案: what is the difference between null != object and object!=null [duplicate] (2 个回答) 7年
当我在我的本地主机 Google App Engine 应用程序中将日志记录级别更改为 FINE 时,我开始在我的跟踪堆栈中看到这些: Apr 17, 2013 4:54:20 PM com.goog
Python 有内置函数 type : class type(object) With one argument, return the type of an object. The return v
我正在使用深度学习进行语义分割,我遇到了以下术语:语义分割、实例检测、对象检测 和对象分割. 它们有什么区别? 最佳答案 这些术语的某些用法对用户而言是主观的或依赖于上下文,但据我所知对这些术语的合理
我面临 -[NSConcreteMutableData release] 的问题:消息发送到已释放的实例,我也附上了我的示例代码。 - (IBAction)uploadImage { NSString
我试图显示模型中的单个实例(数据库行),其中多个实例共享多行的相同字段(列)值。为了澄清这一说法,我有以下情况: ID/Title/Slug/Modified 1 Car A 1s ag
我正在尝试使用mockito来模拟服务。然而,我没有找到一种方法来告诉mockito,给定一个类的实例返回给我相同的实例: 类似于: given(service.add(any(Individua
我知道如何从父类(super class)原型(prototype)创建子类原型(prototype)。但是,如果我已经有了父类(super class)对象的实例来创建子类对象怎么办? 在 JS 中
鉴于 Kotlin 1.1。对于某个类的 instance,instance::class.java 和 instance.javaClass 似乎几乎是等价的: val i = 0 println(
这个问题在这里已经有了答案: 8年前关闭。 Possible Duplicate: Find out the instance id from within an ec2 machine 我正在寻找从
为什么我的 Instantiate 函数没有创建 That 的“空白”实例? 我有以下最小类: classdef That < handle properties This = '' end
Session session = HibernateUtil.getSessionFactory().openSession(); Transaction tx = session.beginTra
考虑以下几点: public class A { public String name = "i am a A instance"; } public class B extends A {
我正在使用 Scalr 来扩展网站服务器。 在 Apache 服务器上,我安装了 Sakai,并为 Linux 机器创建了一个启动脚本。 问题是,如何确保MySQL实例在Apache服务器启动之前启动
Android Realm DB 允许使用 Realm.getInstance() 获取多个实例。这些中的最佳实践是什么? :1.创建单个实例(应用程序范围)并在任何地方使用它2. 在需要时获取一个新
我很难理解为什么修改实例 a 中的属性会修改实例 b 中的相同属性。 var A = function (){ }; A.prototype.data = { value : 0 }; var
我将 Weka 用作更长管道的一部分,因此,我无法承受将所有数据写入文件或数据库只是为了创建一个 Instances。目的。我可以即时做的是创建 Instance 的列表对象。 来自 this pag
class C: def func(self, a): print(a) c = C() print(c.__dict__) # {} c.func = c.func # c.func i
Angular Routing 文档提到了组件实例创建、组件实例激活和路由激活。 文档没有解释这些概念的区别,以及每次创建/激活发生的时间。 问题 实例创建和实例激活有什么区别? 实例激活和路由激活有
我是一名优秀的程序员,十分优秀!