gpt4 book ai didi

python - 从内部取消 celery 任务?

转载 作者:行者123 更新时间:2023-11-30 23:05:07 25 4
gpt4 key购买 nike

我知道我可以返回,但我想知道是否还有其他东西,特别是对于辅助方法,其中return None的任务将迫使调用者添加样板检查每次调用。

我找到了InvalidTaskError ,但没有真正的文档 - 这是内部的事情吗?提出这个问题合适吗?

我一直在寻找类似于 self.retry()self.abort() 之类的东西,但没有看到任何东西。

这是我使用它的示例。

def helper(task, arg):
if unrecoverable_problems(arg):
# abort the task
raise InvalidTaskError()

@task(bind=True)
task_a(self, arg):
helper(task=self, arg=arg)
do_a(arg)

@task(bind=True)
task_b(self, arg):
helper(task=self, arg=arg)
do_b(arg)

最佳答案

经过更多挖掘后,我找到了一个使用 Reject 的示例;

(从文档页面复制)

The task may raise Reject to reject the task message using AMQPs basic_reject method. This will not have any effect unless Task.acks_late is enabled.

Rejecting a message has the same effect as acking it, but some brokers may implement additional functionality that can be used. For example RabbitMQ supports the concept of Dead Letter Exchanges where a queue can be configured to use a dead letter exchange that rejected messages are redelivered to.

Reject can also be used to requeue messages, but please be very careful when using this as it can easily result in an infinite message loop.

Example using reject when a task causes an out of memory condition:

import errno
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def render_scene(self, path):
file = get_file(path)
try:
renderer.render_scene(file)

# if the file is too big to fit in memory
# we reject it so that it's redelivered to the dead letter exchange
# and we can manually inspect the situation.
except MemoryError as exc:
raise Reject(exc, requeue=False)
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise Reject(exc, requeue=False)

# For any other error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)

关于python - 从内部取消 celery 任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33346409/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com