- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试使用 Celery 作为 Twisted 应用程序的控制 channel 。我的 Twisted 应用程序是一个抽象层,它为各种本地运行的进程(通过 ProcessProtocol)提供标准接口(interface)。我想使用 Celery 来远程控制它——AMQP 似乎是从中央位置控制许多 Twisted 应用程序的理想方法,我想利用 Celery 基于任务的功能,例如任务重试、子任务等
这并没有像我计划的那样工作,我希望有人能帮助我指明正确的方向以实现这一目标。
我在运行脚本时试图实现的行为是:
“稍微修改过的 celery ”是 celeryd有一个小的修改,允许任务通过 self.app.twisted 访问 Twisted react 器,并通过 self.app.process 访问生成的进程。为了简单起见,我使用了 Celery 的“单独”进程池实现,它不会为任务 worker 创建新进程。
当我尝试使用 Celery 任务来初始化 ProcessProtocol(即启动外部进程)时,我的问题就出现了。进程正确启动,但 ProcessProtocol 的 childDataReceived 永远不会被调用。我认为这与未正确继承/设置文件描述符有关。
下面是一些示例代码,基于 ProcessProtocol 文档中的“wc”示例。它包括两个 Celery 任务——一个用于启动 wc 进程,另一个用于计算某些文本中的单词(使用先前启动的 wc 进程)。
这个示例相当人为设计,但如果我能让它正常工作,它将作为实现我的 ProcessProtocols 的良好起点,这些 ProcessProtocols 是长期运行的进程,将响应写入标准输入的命令。
我首先通过运行 Celery 守护进程来测试它:
python2.6 mycelery.py -l info -P solo
然后,在另一个窗口中,运行发送两个任务的脚本:
python2.6 命令测试.py
command_test.py 的预期行为是执行两个命令 - 一个启动 wc 进程,另一个向 CountWordsTask 发送一些文本。实际发生的是:
任何人都可以阐明这一点,或者就如何最好地使用 Celery 作为 Twisted ProcessProtocols 的控制 channel 提供一些建议吗?
为 Celery 编写一个 Twisted-backed ProcessPool 实现会更好吗?我通过 reactor.callLater 调用 WorkerCommand.execute_from_commandline 的方法是否是确保一切都发生在 Twisted 线程内的正确方法?
我已经阅读了有关 AMPoule 的资料,我认为它可以提供其中的一些功能,但如果可能的话我想坚持使用 Celery,因为我在我的应用程序的其他部分使用它。
任何帮助或协助将不胜感激!
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if __name__ == '__main__':
main()
from twisted.internet import protocol
from twisted.internet.defer import Deferred
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name
def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)
def processExited(self, status):
print 'exiting'
def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars
def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']
class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')
最佳答案
Celery 可能会在等待来自网络的新消息时阻塞。由于您在一个单线程进程中与 Twisted react 器一起运行它,因此它会阻止 react 器运行。这将禁用大部分 Twisted,这需要 react 堆实际运行(您调用了 reactor.run
,但由于 Celery 阻止了它,它实际上没有运行)。
reactor.callLater
只是延迟了 Celery 的启动。一旦 Celery 启动,它仍然会阻塞 react 器。
您需要避免的问题是阻塞 react 堆。
一种解决方案是在一个线程中运行 Celery,在另一个线程中运行 react 器。使用 reactor.callFromThread
从 Celery 线程向 Twisted 发送消息(“在 react 器线程中调用函数”)。如果您需要从 Twisted 线程将消息发送回 Celery,请使用 Celery 等效项。
另一种解决方案是将 Celery 协议(protocol)(AMQP? - 请参阅 txAMQP)作为原生 Twisted 库来实现,并使用它来无阻塞地处理 Celery 消息。
关于python - 使用 Celery 作为 Twisted 应用程序的控制 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8137277/
我正在尝试发送 RFC5424 中定义的结构化数据从 Twisted 到系统日志。我在 Ubuntu 上,系统日志实现是 rsyslog。 我正在使用“twistd --syslog”运行扭曲的应用程
摘要:无法自动向 Twisted 的检修孔发出命令。寻求解决方案/建议。 Twisted 有一个很棒的功能,称为 manhole。它允许用户通过 ssh 连接到当前正在运行的 Twisted 服务器并
我正在尝试发送 RFC5424 中定义的结构化数据从 Twisted 到系统日志。我在 Ubuntu 上,系统日志实现是 rsyslog。 我正在使用“twistd --syslog”运行扭曲的应用程
我正在尝试使用扭曲的 spawnProcess 启动一个进程,并使用 psutil 每 5 秒记录一次资源使用情况。首先,我尝试使用以下代码获取生成的进程的可执行文件名称: #!/usr/bin/py
Azure 网站能否托管 Twisted 应用程序?例如像这样的东西: from twisted.internet import reactor from twisted.web import ser
有没有办法限制 Twisted http 客户端的下载速率?如果没有,在 Twisted 中实现此类客户端的最简单方法是什么? 最佳答案 Twisted 中的流控制最常使用 IProducer.pau
我的基于 Twisted 的客户端循环发送 UDP 数据包。因此,我正在使用 DatagramProtocol 类。这是来源: #!/usr/bin/python # -*- coding: utf-
raven 有很多集成,包括 python 日志记录。一方面,twisted 不使用python 的日志记录。而另一方面,在twisted 中,raven 并没有直接的整合。 那么在基于扭曲的设置中使
有人告诉我,Twisted基于库Ampoule是创建在不同计算机上执行的进程池的好方法。然而,没有相关文档,Ampoule 的示例也没有说明这一点。 我对类似于 stdlib multiprocess
Twisted 最近推出了一个新的日志记录模块:twisted.logger。我已经通读了文档 [1],但我一直无法找到设置实际日志级别的位置。关于日志观察者[2] 的部分建议,如果您显式配置自己的
我想问一个关于如何关闭扭曲连接的问题 RPC . 我知道有人问过类似的问题,但似乎没有回答我的问题。 我正在做一些基本的连接,如下图所示: cfactory = pb.PBClientFactory(
我正在使用 Twisted 编写代码,但在为我的扭曲互联网延迟变量想出一个合理的变量名时遇到了麻烦。这是我的候选人: d :太通用,太短,违反了 pylint 规则 C0103。 def :与内置函数
我正在为 Web 服务器的静态部分使用 Twisted Web static.File 资源。 对于开发,我希望能够添加新文件或修改当前静态文件,而无需重新启动 Twisted 网络服务器。 我在 g
我想使用 Twisted 重建现有应用程序的通信部分。这个应用程序确实从客户端向服务器发送数据,只是这样,服务器不发送任何东西。 我如何使用 Twisted 的事件驱动概念来实现这一目标?我目前使用
我开始考虑通过扩展当前的 Twisted FTP 来实现显式 FTP。 大部分代码都很简单,实现 AUTH、PBSZ、PROT 很容易,我得到了一个有效的安全控制 channel 。 我的问题是数据通
我想实现 WebRTC 对等连接。我不知道如何设置 ICE 服务器或应该使用什么工具。任何人都可以给我建议吗?非常感谢。 最佳答案 实际上你不需要编写自己的 STUN/TURN 服务器 有很多现成的解
正如标题所示,ProcessProtocol 类上的这两个函数有什么区别?关于什么时候应该使用一种而不是另一种的文档有点稀疏? 我最好寻找一些可以证明这一点的用例示例。 最佳答案 我猜文档在这一点上有
使用下面的代码,我似乎可以相当轻松地使用 multiprocessing.reduction 在子进程中重建套接字.. import socket,os import multiprocessing
我正在尝试在 python 上使用 twisted 编写服务器。这是我文件的头部: from twisted.internet.protocol import Factory, Protocol fr
我使用 Twisted 制作了一个简单的 http 服务器,它发送 Content-Type: multipart/x-mixed-replace header 。我正在使用它来测试我想设置为接受长期
我是一名优秀的程序员,十分优秀!