- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试使用 Celery 作为 Twisted 应用程序的控制 channel 。我的 Twisted 应用程序是一个抽象层,它为各种本地运行的进程(通过 ProcessProtocol)提供标准接口(interface)。我想使用 Celery 来远程控制它——AMQP 似乎是从中央位置控制许多 Twisted 应用程序的理想方法,我想利用 Celery 基于任务的功能,例如任务重试、子任务等
这并没有像我计划的那样工作,我希望有人能帮助我指明正确的方向以实现这一目标。
我在运行脚本时试图实现的行为是:
“稍微修改过的 celery ”是 celeryd有一个小的修改,允许任务通过 self.app.twisted 访问 Twisted react 器,并通过 self.app.process 访问生成的进程。为了简单起见,我使用了 Celery 的“单独”进程池实现,它不会为任务 worker 创建新进程。
当我尝试使用 Celery 任务来初始化 ProcessProtocol(即启动外部进程)时,我的问题就出现了。进程正确启动,但 ProcessProtocol 的 childDataReceived 永远不会被调用。我认为这与未正确继承/设置文件描述符有关。
下面是一些示例代码,基于 ProcessProtocol 文档中的“wc”示例。它包括两个 Celery 任务——一个用于启动 wc 进程,另一个用于计算某些文本中的单词(使用先前启动的 wc 进程)。
这个示例相当人为设计,但如果我能让它正常工作,它将作为实现我的 ProcessProtocols 的良好起点,这些 ProcessProtocols 是长期运行的进程,将响应写入标准输入的命令。
我首先通过运行 Celery 守护进程来测试它:
python2.6 mycelery.py -l info -P solo
然后,在另一个窗口中,运行发送两个任务的脚本:
python2.6 命令测试.py
command_test.py 的预期行为是执行两个命令 - 一个启动 wc 进程,另一个向 CountWordsTask 发送一些文本。实际发生的是:
任何人都可以阐明这一点,或者就如何最好地使用 Celery 作为 Twisted ProcessProtocols 的控制 channel 提供一些建议吗?
为 Celery 编写一个 Twisted-backed ProcessPool 实现会更好吗?我通过 reactor.callLater 调用 WorkerCommand.execute_from_commandline 的方法是否是确保一切都发生在 Twisted 线程内的正确方法?
我已经阅读了有关 AMPoule 的资料,我认为它可以提供其中的一些功能,但如果可能的话我想坚持使用 Celery,因为我在我的应用程序的其他部分使用它。
任何帮助或协助将不胜感激!
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if __name__ == '__main__':
main()
from twisted.internet import protocol
from twisted.internet.defer import Deferred
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name
def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)
def processExited(self, status):
print 'exiting'
def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars
def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']
class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')
最佳答案
Celery 可能会在等待来自网络的新消息时阻塞。由于您在一个单线程进程中与 Twisted react 器一起运行它,因此它会阻止 react 器运行。这将禁用大部分 Twisted,这需要 react 堆实际运行(您调用了 reactor.run
,但由于 Celery 阻止了它,它实际上没有运行)。
reactor.callLater
只是延迟了 Celery 的启动。一旦 Celery 启动,它仍然会阻塞 react 器。
您需要避免的问题是阻塞 react 堆。
一种解决方案是在一个线程中运行 Celery,在另一个线程中运行 react 器。使用 reactor.callFromThread
从 Celery 线程向 Twisted 发送消息(“在 react 器线程中调用函数”)。如果您需要从 Twisted 线程将消息发送回 Celery,请使用 Celery 等效项。
另一种解决方案是将 Celery 协议(protocol)(AMQP? - 请参阅 txAMQP)作为原生 Twisted 库来实现,并使用它来无阻塞地处理 Celery 消息。
关于python - 使用 Celery 作为 Twisted 应用程序的控制 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8137277/
我很好奇为什么以下不起作用。一般select用default:防止死锁,但在这种情况下不是: package main import "fmt" func main () { a := mak
我一生都无法弄清楚如何切换图像排序。图像以 (x,x,3) 格式读取,theano 要求它是 (3,x,x) 格式。我尝试更改顺序numpy.array([img[:,:,i] for i in ra
我正在向 libnl 发送单个 SSID 和频率进行扫描,但我得到了多个扫描结果以及我请求的 SSID 和频率,但我需要单个扫描结果(仅适用于请求的 SSID),如何实现这一点。请帮助我,我也发送了我
我是 Golang 的新手,但正在努力理解这门伟大的语言!请帮帮我.. 我有 2 个 channel 。 “进”和“出” channel in, out := make(chan Work),
例如我有这段代码: package main import ( "fmt" ) func main() { c1 := make(chan interface{}) close
我们使用以下调用来获取经过身份验证的用户的 ChannelID,它适用于大多数情况。一些 YouTube 用户将他们的 channel 连接到 Google+ 信息页,但在这种情况下,我们的一位用户无
case 'sinfo': const sinfo = new Discord.MessageEmbed() .addField('Server Name 🔎 :', message.guild.n
我需要让所有 channel 来创建一个 bunker 命令,这使得所有 channel 都是只读的。 最佳答案 他们变了Client.servers至 Client.guilds在 newer ve
为什么当第二个值通过另一个 go routine 发送并且没有收到发送的第一个值时, channel c 没有缓冲? package main import "fmt" func sum(s []in
据我所知,内置的 split 会将一个 3 channel Mat 拆分为三个 1 channel Mat。结果,这三个 Mat 只是具有一些不同强度的灰度。 我的意图是获得三个 3 channel
如何检测当前的 RAM 配置?我需要询问 Windows RAM 当前是在单 channel 、双 channel 还是四 channel 中运行。 我搜索了很多,并没有在这个网站或其他网站上找到任何
我需要拆分一个多 channel wav 文件并将每个 channel 编码为 mp3 文件。 我知道 gtresamer 的 deinterleave 插件,但我不确定如何将它用于 wav 文件以及
关闭。这个问题需要details or clarity .它目前不接受答案。 想要改进这个问题吗? 通过 editing this post 添加详细信息并澄清问题. 关闭 8 年前。 Improve
我正在尝试运行 Hyperledger Fabric 网络,它由单个订购者、单个对等节点和一个 cli 组成。为了学习启动 Hyperledger Fabric 网络的过程,从创建与加密相关的工件到将
我在 Laravel 中使用事件广播。我正在使用基于角色的通知访问权限。我有用于广播的自定义 auth guard。当用户连接到 channel 时,客户端将具有内部权限的 access_token
我正在编写一个使用 Elixir Channels 来处理实时事件的应用程序。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个 channel 。所以我的应用程序是一个聊天应用程序,其
我有一些 .wav 文件,我想转换它们的频率 (fs) 和 channel 数 (nchannels)。我在jupyter笔记本python3.6上使用ffmpeg。我使用了以下命令并且它有效。 cm
我有一个视频渲染器,它需要两个 H265 流(YUV420),我需要烘焙它们以使它们中的一个与另一个形成 alpha 蒙版。这一切都已解决并且效果很好,但是如果我按照此处的说明进行操作: ffmpeg
我运行此命令以便能够将 udp 直播流传输到可使用正在构建的移动应用程序播放的 http 直播流。 它只是一个只有音频流的流。 ffmpeg -i udp://@localhost:1111 -map
我在我的 discord.js 机器人中创建了 nuke 命令,它创建了具有相同名称、权限、主题等的 channel ,并删除了“原始” channel 。但是有一个问题,如何使 channel 与“
我是一名优秀的程序员,十分优秀!