gpt4 book ai didi

django - 接收 celery 任务中的事件

转载 作者:行者123 更新时间:2023-12-04 17:08:36 28 4
gpt4 key购买 nike

我有一个长期运行的 celery 任务,该任务遍历一系列项目并执行一些操作。

任务应该以某种方式报告当前正在处理的项目,以便最终用户知道任务的进度。

目前,我的django应用程序和celery一起坐在一台服务器上,因此我能够使用Django的模型来报告状态,但是我计划添加更多远离Django的工作人员,因此他们无法访问DB。

现在,我看到的解决方案很少:

  • 使用一些存储手动存储中间结果,例如redis或mongodb,然后可以通过网络使用它们。这让我有些担心,因为例如,如果我将使用redis,则应该在Django端同步读取状态的代码,并使用Celery任务写入状态,因此它们使用相同的键。
  • 使用REST调用从celery向Django报告状态。像PUT http://django.com/api/task/123/items_processed
  • 也许使用Celery事件系统并创建类似Item processed的事件,django将在该事件上更新计数器
  • 创建一个单独的工作程序,该工作程序在django的服务器上运行,其中django的任务仅增加items proceeded的计数,因此当任务完成某项时,它会发出increase_messages_proceeded_count.delay(task_id)

  • 我提到的解决方案是否存在任何解决方案或隐藏的问题?

    最佳答案

    可能有很多方法可以实现您的目标,但是我将按照以下方式来实现它。

    在您长期运行的celery任务中,使用django's caching framework设置进度:

    from django.core.cache import cache

    @app.task()
    def long_running_task(self, *args, **kwargs):
    key = "my_task: %s" % self.result.id
    ...
    # do whatever you need to do and set the progress
    # using cache:
    cache.set(key, progress, timeout="whatever works for you")
    ...

    然后,您要做的就是使用该 key 发出一个周期性的AJAX GET请求,并从缓存中检索进度。遵循这些原则:
     def task_progress_view(request, *args, **kwargs):
    key = request.GET.get('task_key')
    progress = cache.get(key)
    return HttpResponse(content=json.dumps({'progress': progress}),
    content_type="application/json; charset=utf-8")

    但是,请注意,如果您将服务器作为多个进程运行,请确保使用诸如memcached之类的东西,因为django的 native 缓存在各个进程之间是不一致的。另外,我可能不会将 celery 的 task_id用作 key ,但这足以用于演示目的。

    关于django - 接收 celery 任务中的事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32583897/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com