gpt4 book ai didi

celery - 在 celery 中,当任务排队时,将上下文元数据从发送方进程传递给工作人员的适当方法是什么?

转载 作者:行者123 更新时间:2023-12-01 12:06:57 33 4
gpt4 key购买 nike

当任何 celery 任务排队时,我想添加工作人员将能够使用的上下文元数据。

以下代码示例有效,但我想要一个合适的 celery 式解决方案。

from celery.signals import before_task_publish, task_prerun

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
task_kwags = body[1]
metadata = {"foo": "bar"}
task_kwags['__metadata__'] = metadata

@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
metadata = kwargs['kwargs'].pop('__metadata__', {})
# metadata == {"foo": "bar"}

最佳答案

当一个任务在 worker 中启动时 before_task_publish 的内容的 header**kwargspush_request .

celery/app/tasks.py:1000

    def push_request(self, *args, **kwargs):
self.request_stack.push(Context(*args, **kwargs))

Context 的构造函数中做了一些好事. self.__dict__.update()意味着我们可以访问这些值 Context(metadata={'foo': 'bar'}).metadata
celery/app/tasks.py:99

class Context(object)
# ...
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)

def update(self, *args, **kwargs):
return self.__dict__.update(*args, **kwargs)

任务上下文可从 Task 访问的 request属性(property)。

celery/app/tasks.py:1019

class Task(object):
# ...
def _get_request(self):
"""Get current request object."""
req = self.request_stack.top
if req is None:
# task was not called, but some may still expect a request
# to be there, perhaps that should be deprecated.
if self._default_request is None:
self._default_request = Context()
return self._default_request
return req
request = property(_get_request)

这意味着最终的解决方案很简单:

from celery.signals import before_task_publish, task_prerun

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
metadata = {"foo": "bar"}
headers['__metadata__'] = metadata

@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
metadata = getattr(task.request, '__metadata__', {})
# metadata == {"foo": "bar"}

注: task.request.__metadata__也可以工作,但如果在集成信号之前将任务排入队列,则它会失败。这样更安全。

关于celery - 在 celery 中,当任务排队时,将上下文元数据从发送方进程传递给工作人员的适当方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55660979/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com