gpt4 book ai didi

python - 在 Celery 中实现 Twisted 风格的本地多重延迟回调

转载 作者:太空宇宙 更新时间:2023-11-04 03:52:40 24 4
gpt4 key购买 nike

我对使用 Celery 很陌生,想知道如何在 Celery 中实现 TWSITED 类型的多个延迟回调

我的 TWISTED 代码使用透视代理,如下所示。我有一个处理程序(服务器),它处理一些事件并返回结果。调度程序(客户端)打印使用延迟回调返回的结果。

Handler.py(服务器端)

from twisted.application import service, internet
from twisted.internet import reactor, task
from twisted.spread import pb
from Dispatcher import Event
from Dispatcher import CopyEvent

class ReceiverEvent(pb.RemoteCopy, Event):
pass
pb.setUnjellyableForClass(CopyEvent, ReceiverEvent)


class Handler(pb.Root):

def remote_eventEnqueue(self, pond):
d = task.deferLater(reactor,5,handle_event,sender=self)
return d

def handle_event(sender):
print "Do Something"
return "did something"

if __name__ == '__main__':
h=Handler()
reactor.listenTCP(8739, pb.PBServerFactory(h))
reactor.run()

现在是 Dispatcher.py(客户端)

from twisted.spread import pb, jelly
from twisted.python import log
from twisted.internet import reactor
from Event import Event

class CopyEvent(Event, pb.Copyable):
pass

class Dispatcher:
def __init__(self, event):
self.event = event

def dispatch_event(self, remote):
d = remote.callRemote("eventEnqueue", self.event)
d.addCallback(self.printMessage)

def printMessage(self, text):
print text

def main():
from Handler import CopyEvent
event = CopyEvent()
d = Dispatcher(event)
factory = pb.PBClientFactory()
reactor.connectTCP("localhost", 8739, factory)
deferred = factory.getRootObject()
deferred.addCallback(d.dispatch_event)
reactor.run()

if __name__ == '__main__':
main()

我尝试在 Celery 中实现它。

Handler.py(服务器端)

from celery import Celery

app=Celery('tasks',backend='amqp',broker='amqp://guest@localhost//')

@app.task

def handle_event():
print "Do Something"
return "did something"

Dispatcher.py(客户端)

from Handler import handle_event
from datetime import datetime

def print_message(text):
print text


t=handle_event.apply_async(countdown=10,link=print_message.s('Done')) ##HOWTO?

我的确切问题是如何在 Celery 中的 print_message 等本地函数上实现延迟回调 TWISTED 样式。当 handle_Event 方法完成时,它返回结果,我希望在该结果上有另一个本地回调方法 (print_message)

在 Celery 中还有其他可能的设计工作流程吗?

谢谢

小红书

最佳答案

好吧,终于明白了。不太可能像 Twisted 样式那样直接在 Celery 客户端中添加回调。但 Celery 支持任务监控功能,使客户端能够监控不同类型的 worker 事件并为其添加回调。

一个简单的任务监视器 (Task_Monitor.py) 看起来像这样。 (详见Celery实加工文档http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing)

Task_Monitor.py

from celery import Celery

def task_monitor(app):
state = app.events.State()

def announce_completed_tasks(event):
state.event(event)
task = state.tasks.get(event['uuid'])

print('TASK SUCCEEDED: %s[%s] %s' % (task.name, task.uuid, task.info(), ))

with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={'task-succeeded': announce_completed_tasks})
recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
app = Celery(broker='amqp://guest@REMOTEHOST//')
task_monitor(app)

Task_Monitor.py 必须作为单独的进程(客户端)运行。除了 Celery 应用程序(服务器端)需要使用

app.conf.CELERY_SEND_EVENTS = TRUE

或在运行 celery 时使用 -E 选项

以便它发送事件以便监控工作人员。

关于python - 在 Celery 中实现 Twisted 风格的本地多重延迟回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20475539/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com