gpt4 book ai didi

python - Django celery worker 向前端发送实时状态和结果消息

转载 作者:太空狗 更新时间:2023-10-30 01:36:31 24 4
gpt4 key购买 nike

在 Django 应用程序中,我正在运行异步任务,并希望向用户显示进度、错误等。如果有错误,用户应该被重定向到一个页面,在该页面上需要额外的输入或一些操作来解决问题。从 celery 工作返回到前端的最佳通信方式是什么?

这是伪代码的基本结构:

# views.py
from tasks import run_task

def view_task():
run_task.delay()
return render(request, 'template.html')

# tasks.py
from compute_module import compute_fct

@shared_task
def run_task():
result = compute_fct()

# how to catch status update messages from compute_module while compute_fct is running??

if result == 'error':
handle_error()
else:
handle_succes()

# compute_module
import pandas as pd

def compute_fct():
# send message: status = loading file
df = pd.read_csv('test.csv')
# send message: status = computing
val = df['col'].mean()

if val is None:
return {'status':'error'}
else:
return {'status':'success','val':val}

理想情况下我想要什么:

  • compute_module.py 模块使用 python 原生记录器。通过职责分离,我希望尽可能保持日志记录的通用性,并使用标准的 python/django 记录器。但它们似乎并不是为了向前端发送消息而设计的。
  • celery 任务以某种方式处理日志,而不是将它们显示在标准输出上,而是将它们重定向到推送器
  • 前端js显示和处理消息

在 celery worker 和前端之间可能存在我不知道的标准通信方式。这种情况一定经常发生,我很惊讶它如此难以实现。在某种程度上,rabbitmq 消息队列或 aws sns 应该为此设计。以下是我看过的资源,但我觉得它们中的任何一个都不是很好,但也许我只是感到困惑。

日志记录:这似乎更多是关于在服务器端进行日志记录,而不是向用户发送消息

Celery cam 似乎是关于管理员监控任务,而不是向用户发送消息

pusher 我喜欢,但我不想让 compute_module.py 处理它。例如,我不希望在 compute_module.py 中进行任何 pusher.com 集成。我想我可以传递一个已经实例化的推送器对象,这样模块就可以推送消息,但我还是希望它是通用的

最佳答案

编辑:现在转移到 django-channels,效果很好但比下面的解决方案更复杂。

上一个:

好的,下面是我目前如何解决它的伪代码。基本上我使用 https://pusher.com/docs/javascript_quick_start服务器端将实例化对象传递到 compute_module。一个缺点是推送消息是短暂的,所以我将不得不在 LogPusher 中做一些额外的工作以将它们存储在数据库中,改天再做......

同样在我的实际实现中,我通过 $(document).ready() 中的 $.post() ajax 调用触发任务,因为小任务完成得非常快用户永远不会看到推送消息,因为未建立连接(回到那个历史消息问题)。

我上面没有提到的另一个替代路线是 https://channels.readthedocs.io/en/latest/

[编辑] 另一个解决方案是 Server-sent events其中有 django implementations ,还没有测试过。但它看起来很适合单向更新,例如从服务器到客户端(相对于 websockets 双向)。你需要一个像redis pubsub这样的消息系统获取服务器 sse 路由的更新。

通过推送器从 django 服务器进行前端更新:

# views.py
from tasks import run_task

def view_task():
run_task.delay('event')
return render(request, 'template.html', 'pusher_event':'event')


# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
def __init__(self, event):
self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
key=settings.PUSHER_KEY,
secret=settings.PUSHER_SECRET,
cluster=settings.PUSHER_CLUSTER, ssl=True)
self.event = event

def send(self, data):
self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):

log_pusher = LogPusher(pusher_event)
result = compute_fct(log_pusher)

# how to catch status update messages from compute_module while compute_fct is running??

if result == 'error':
log_pusher.send('status':'error')
else:
log_pusher.send('status':'success')


# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
# send message: status = loading file
log_pusher.send('status':'loading file')
df = pd.read_csv('test.csv')
# send message: status = computing
log_pusher.send('status':'computing')
val = df['col'].mean()

if val is None:
return {'status':'error'}
else:
return {'status':'success','val':val}


# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings

def pusher(request):
return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }


# template.html
<script>

var pusher = new Pusher("{{PUSHER_KEY}}", {
cluster: "{{PUSHER_CLUSTER}}",
encrypted: true
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
// process data
});

</script>

关于python - Django celery worker 向前端发送实时状态和结果消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47841809/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com