gpt4 book ai didi

python - 如何从 Celery 获取发起任务执行的队列

转载 作者:行者123 更新时间:2023-12-01 07:15:51 24 4
gpt4 key购买 nike

因此,我正在创建一个监视器应用程序,它将日志从 celery 任务发送到 ELK 堆栈。

到目前为止,我已经做到了:

from project.celery import app


def monitor(app):
state = app.events.State()

def on_event_success(event):
state.event(event)

task = state.tasks.get(event['uuid'])
if task.name:
task_name = task.name
task_origin = task.origin
task_type = task.type
task_worker = task.worker
task_info = task.info()
task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}\n\n".format(task_name, task_origin, task_type, task_worker, task_info['args'])
print "SUCCESS: {}".format(task_log)

with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_event_success
})
recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
application = app
monitor(app)

通过这段代码,我能够捕获任务中几乎所有可用的信息,但我没有找到一种方法来捕获哪个队列生成了任务执行。

我有两个队列:

CELERY_QUEUES = (
# Persistent task queue
Queue('celery', routing_key='celery'),
# Persistent routine task queue
Queue('routine', routing_key='routine')
)

我想知道哪个队列发起了我的任务执行,并从事件创建的任务对象中获取此信息。

最佳答案

为了做到这一点,您需要 enable the task sent event

您还需要为 task-sent 事件实现一个处理程序,就像您对 task-succeeded 所做的那样。

您的监控应用程序应至少保留所有捕获的任务 ID(event["uuid"])和路由 key (event["routing_key"]) task-sent 事件。我使用 cachetools 中的 TTLCache 来执行此操作,并且当我需要路由键信息时,我会使用任务成功和任务失败事件处理程序中的此字典。

如果您想要任务名称和参数作为示例,则需要按照我上面描述的相同方式处理 task-received 事件...

您可能想知道为什么我使用 TTLCache - 我们的 Celery 集群每天运行几百万个任务,将所有任务发送的事件数据保留在内存中很快就会占用所有可用内存。

最后,下面是缓存任务发送数据并在任务成功事件处理程序中使用它的代码:

from cachetools import TTLCache
from project.celery import app


def monitor(app):
state = app.events.State()

# keep a couple of days of history in case not acknowledged tasks are retried
task_info = TTLCache(float('inf'), 3.2 * 24 * 60 * 60)

def on_event_success(event):
nonlocal task_info
state.event(event)

task = state.tasks.get(event['uuid'])
if task.name:
task_name = task.name
task_origin = task.origin
task_type = task.type
task_worker = task.worker
t_info = task.info()
task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}".format(task_name, task_$
print("SUCCESS: {}".format(task_log))
if event["uuid"] in task_info:
cached_task = task_info[event["uuid"]]
if "routing_key" in cached_task:
print(" routing_key: {}\n\n".format(cached_task["routing_key"]))

def on_task_sent(event):
# task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange,
# routing_key, root_id, parent_id)
nonlocal task_info
if event["uuid"] not in task_info:
task_info[event["uuid"]] = {"name": event["name"],
"args": event["args"],
"queue": event["queue"],
"routing_key": event["routing_key"]}

with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_event_success,
"task-sent": on_task_sent,
"*": state.event
})
recv.capture(limit=None, timeout=None, wakeup=True)


if __name__ == '__main__':
application = app
monitor(app)

我从来没有足够的时间来研究 Celery 的 celery.events.state.State 类。我确实知道它使用LRUCache来缓存一些条目,但我不确定是否可以使用它来代替我在代码中使用的TTLCache...

关于python - 如何从 Celery 获取发起任务执行的队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57962018/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com