gpt4 book ai didi

python - 使用 Celery 作为 Twisted 应用程序的控制 channel

转载 作者:太空狗 更新时间:2023-10-29 18:05:02 25 4
gpt4 key购买 nike

我正在尝试使用 Celery 作为 Twisted 应用程序的控制 channel 。我的 Twisted 应用程序是一个抽象层,它为各种本地运行的进程(通过 ProcessProtocol)提供标准接口(interface)。我想使用 Celery 来远程控制它——AMQP 似乎是从中央位置控制许多 Twisted 应用程序的理想方法,我想利用 Celery 基于任务的功能,例如任务重试、子任务等

这并没有像我计划的那样工作,我希望有人能帮助我指明正确的方向以实现这一目标。

我在运行脚本时试图实现的行为是:

  • 开始一个稍微修改过的celeryd(见下)
  • 等待 Celery 任务
  • 收到“启动流程”任务时,生成 ProcessProtocol
  • 当收到其他任务时,在 Twisted 协议(protocol)上运行一个函数并使用 Deferreds 返回结果

“稍微修改过的 celery ”是 celeryd有一个小的修改,允许任务通过 self.app.twisted 访问 Twisted react 器,并通过 self.app.process 访问生成的进程。为了简单起见,我使用了 Celery 的“单独”进程池实现,它不会为任务 worker 创建新进程。

当我尝试使用 Celery 任务来初始化 ProcessProtocol(即启动外部进程)时,我的问题就出现了。进程正确启动,但 ProcessProtocol 的 childDataReceived 永远不会被调用。我认为这与未正确继承/设置文件描述符有关。

下面是一些示例代码,基于 ProcessProtocol 文档中的“wc”示例。它包括两个 Celery 任务——一个用于启动 wc 进程,另一个用于计算某些文本中的单词(使用先前启动的 wc 进程)。

这个示例相当人为设计,但如果我能让它正常工作,它将作为实现我的 ProcessProtocols 的良好起点,这些 ProcessProtocols 是长期运行的进程,将响应写入标准输入的命令。

我首先通过运行 Celery 守护进程来测试它:

python2.6 mycelery.py -l info -P solo

然后,在另一个窗口中,运行发送两个任务的脚本:

python2.6 命令测试.py

command_test.py 的预期行为是执行两个命令 - 一个启动 wc 进程,另一个向 CountWordsTask 发送一些文本。实际发生的是:

  • StartProcTask 生成进程,并通过 Deffered 接收“进程启动”作为响应
  • CountWordsTask 永远不会收到结果,因为永远不会调用 childDataReceived

任何人都可以阐明这一点,或者就如何最好地使用 Celery 作为 Twisted ProcessProtocols 的控制 channel 提供一些建议吗?

为 Celery 编写一个 Twisted-backed ProcessPool 实现会更好吗?我通过 reactor.callLater 调用 WorkerCommand.execute_from_commandline 的方法是否是确保一切都发生在 Twisted 线程内的正确方法?

我已经阅读了有关 AMPoule 的资料,我认为它可以提供其中的一些功能,但如果可能的话我想坚持使用 Celery,因为我在我的应用程序的其他部分使用它。

任何帮助或协助将不胜感激!

myceleryd.py
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor


class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)

def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()

if __name__ == '__main__':
main()

协议(protocol).py
from twisted.internet import protocol
from twisted.internet.defer import Deferred

class WCProcessProtocol(protocol.ProcessProtocol):

def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name

def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')

def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)

if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)

def processExited(self, status):
print 'exiting'


def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars

def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']

任务.py
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor

class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']

class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')

最佳答案

Celery 可能会在等待来自网络的新消息时阻塞。由于您在一个单线程进程中与 Twisted react 器一起运行它,因此它会阻止 react 器运行。这将禁用大部分 Twisted,这需要 react 堆实际运行(您调用了 reactor.run,但由于 Celery 阻止了它,它实际上没有运行)。

reactor.callLater 只是延迟了 Celery 的启动。一旦 Celery 启动,它仍然会阻塞 react 器。

您需要避免的问题是阻塞 react 堆。

一种解决方案是在一个线程中运行 Celery,在另一个线程中运行 react 器。使用 reactor.callFromThread 从 Celery 线程向 Twisted 发送消息(“在 react 器线程中调用函数”)。如果您需要从 Twisted 线程将消息发送回 Celery,请使用 Celery 等效项。

另一种解决方案是将 Celery 协议(protocol)(AMQP? - 请参阅 txAMQP)作为原生 Twisted 库来实现,并使用它来无阻塞地处理 Celery 消息。

关于python - 使用 Celery 作为 Twisted 应用程序的控制 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8137277/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com