- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有 celery beat 和 celery(四个 worker )批量做一些加工步骤。其中一项任务大致是这样的:“对于每个尚未创建 Y 的 X,创建一个 Y。”
任务以半快速(10 秒)的速度定期运行。任务完成得非常快。还有其他任务正在进行中。
我已经多次遇到节拍任务明显积压的问题,因此同一任务(来自不同的节拍时间)同时执行,导致错误地重复工作。任务似乎也是乱序执行的。
是否可以限制 celery beat 以确保一次只有一个未完成的任务实例?在任务上设置类似 rate_limit=5
的设置是否是执行此操作的“正确”方法?
是否可以确保节拍任务按顺序执行,例如beat 不是分派(dispatch)任务,而是将其添加到任务链中?
除了使这些任务本身以原子方式执行并且可以安全地并发执行之外,处理此问题的最佳方法是什么?这不是我所期望的节拍任务的限制……
任务本身的定义很简单:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
这是一个实际的(清理过的)日志:
[00:00.000]
foocorp.tasks.add_y_to_xs 已发送。 id->#1[00:00.001]
收到任务:foocorp.tasks.add_y_to_xs[#1][00:10.009]
foocorp.tasks.add_y_to_xs 已发送。 id->#2[00:20.024]
foocorp.tasks.add_y_to_xs 已发送。 id->#3[00:26.747]
收到任务:foocorp.tasks.add_y_to_xs[#2][00:26.748]
任务池:应用 #2[00:26.752]
收到任务:foocorp.tasks.add_y_to_xs[#3][00:26.769]
接受的任务:foocorp.tasks.add_y_to_xs[#2] pid:26528[00:26.775]
任务 foocorp.tasks.add_y_to_xs[#2] 在 0.0197986490093 秒内成功:无[00:26.806]
任务池:应用 #1[00:26.836]
任务池:应用 #3[01:30.020]
接受的任务:foocorp.tasks.add_y_to_xs[#1] pid:26526[01:30.053]
接受的任务:foocorp.tasks.add_y_to_xs[#3] pid:26529[01:30.055]
foocorp.tasks.add_y_to_xs[#1]:为 X id #9725 添加 Y[01:30.070]
foocorp.tasks.add_y_to_xs[#3]:为 X id #9725 添加 Y[01:30.074]
任务 foocorp.tasks.add_y_to_xs[#1] 在 0.0594762689434 秒内成功:无[01:30.087]
任务 foocorp.tasks.add_y_to_xs[#3] 在 0.0352867960464 秒内成功:无我们目前使用 Celery 3.1.4 和 RabbitMQ 作为传输。
编辑 Dan,这是我的想法:
Dan,这是我最终使用的:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
以及 celery 任务装饰器(仅用于周期性任务):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task
最佳答案
from functools import wraps
from celery import shared_task
def skip_if_running(f):
task_name = f'{f.__module__}.{f.__name__}'
@wraps(f)
def wrapped(self, *args, **kwargs):
workers = self.app.control.inspect().active()
for worker, tasks in workers.items():
for task in tasks:
if (task_name == task['name'] and
tuple(args) == tuple(task['args']) and
kwargs == task['kwargs'] and
self.request.id != task['id']):
print(f'task {task_name} ({args}, {kwargs}) is running on {worker}, skipping')
return None
return f(self, *args, **kwargs)
return wrapped
@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
pass
test_single_task.delay()
关于python - celery 节拍 : Limit to single task instance at a time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20894771/
好吧,我对我的页面发生的事情有点困惑。我在底部有页码, table 上摆满了 23 种不同的元素。 每个页面应一次显示 5 个项目。我显示的页面发送了一个带有 pagenumber = 不管怎样的 g
几周前我开始学习 Ada。我知道 limited 在某些情况下声明了一个有限类型,不允许复制对象 来自 Ada Reference Manual 2012 7.5 1/2 A limited type
我想按 DESC 选择日期并限制为最后 3 个,然后我需要基本上翻转最后 3 个结果,以便它们按 ASC 排序 任何帮助将不胜感激,我正在使用 postgresql 最佳答案 SELECT * FRO
SELECT * FROM mm_tfs WHERE product_slug LIKE '%football%' AND schoolid = '8' AND category_id ='2
SELECT * FROM mm_tfs WHERE product_slug LIKE '%football%' AND schoolid = '8' AND category_id ='2
我正在尝试访问按“日期”键排序的表中恒定数量的最新文档。请注意,不幸的是,日期是被实现的(不是由我......),使得该值设置为字符串,例如“2014-01-14”,或者有时“2014-01-14 2
我目前正在使用具有限制 cpu、限制内存以及保留 cpu 和内存的 Docker Swarm。 完成测试后,我想删除这些配置。我找不到任何有关如何删除这些的文章。 是否有办法通过更新来删除这些设置而不
我目前正在使用具有限制 cpu、限制内存以及保留 cpu 和内存的 Docker Swarm。 完成测试后,我想删除这些配置。我找不到任何有关如何删除这些的文章。 是否有办法通过更新来删除这些设置而不
我必须对我的数据应用分页。我通过复杂的连接查询获取数据,没有任何子查询,只有简单的连接。 假设这个查询[这是一个非常简单的查询,我有一个比这个复杂的查询] SELECT states.state
我经常制作条形图,并将条形图的值额外包含为注释 (geom_text)。通常,我更喜欢这些值右对齐(与将标签放在条形顶部相反)。在绘制多面条形图时,我将这些值放在每个组中的最大值(我之前计算过)加上我
delivery-limit 和 x-delivery-limit 有什么区别? 当我将 x-delivery-limit 设置为 RabbitMQ 队列参数时,我可以看到它限制了我的消息重新排队尝试
delivery-limit 和 x-delivery-limit 有什么区别? 当我将 x-delivery-limit 设置为 RabbitMQ 队列参数时,我可以看到它限制了我的消息重新排队尝试
我正在使用 PostgreSQL 9.3。这应该在具有 100,000 多行的任何表上重现。 EXPLAIN ANALYZE 显示使用 LIMIT 2 扫描了更多行,但我不明白为什么。 限制 1: E
我正在尝试找出是否可以在 PHP 中全局设置和取消设置 MySQL 结果的默认限制。 一些可能看起来像的伪代码: $pdo->prepare('SELECT * FROM example'); $pd
我有下面的代码片段表 在这里我必须对投票前 3 个值求和。 假设 product_id 3030 vote 列的总和为 8.1 和 3671 总和 是 5.2 在这里,我必须获得前 3 个 produ
我正在使用全文搜索来提取行。 我根据分数 (ORDER BY SCORE) 对行进行排序,然后在前 20 行 (LIMIT 20) 中,我想对结果集进行兰德 (RAND)。 因此,对于任何特定的搜索词
帮助创建搜索条件 SELECT * FROM mlt_adr_city WHERE name LIKE "Text%" AND region_id = 59 AND id <> 0 IF (name
MySQL 查询示例: SELECT message_id, message_text FROM messages LIMIT 0 , 30 我得到的这个提示是错误的: HIN
我注意到如果我将查询限制为 1 个而不是 5 个,速度会急剧下降。 SELECT he. * FROM homematic_events he WHERE he.homematic_devices_i
我需要从我的表中获取最后一个 ID,以便我可以在另一个函数中使用它我在我的存储库中创建了这个函数,但我没有工作,它显示了一个错误: [Syntax Error] line 0, col 60: Err
我是一名优秀的程序员,十分优秀!