- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。
解码和发送到远程都可能很耗时。为了使程序更具可扩展性,我们创建了两个进程(Multiprocessing.Process):
现在我需要从它创建一个合适的 linux 守护进程(可以通过 service 命令启动、停止和重新启动)。
问题:如何从 python 多处理程序创建守护进程。我没有找到关于此的指南。有没有人知道如何做到这一点,或者有工作示例。
以下文字是我为实现这一目标所做的努力:我找到了 python 守护进程的小例子:http://www.gavinj.net/2012/06/building-python-daemon-process.html所以我重写了我的代码(抱歉代码太长):
import socket
import time
import os
from select import select
import multiprocessing
from multiprocessing import Process, Queue, Value
import stomp
import json
import logging
logger = logging.getLogger("DaemonLog")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.FileHandler("/var/log/testdaemon/testdaemon.log")
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logger
#Config listner
domain = 'example.host.ru'
port = int(9930)
#Config remote queue access
queue_cfg = {
'host': 'queue.test.ru',
'port': 61113,
'user': 'user',
'password': 'pass',
'queue': '/topic/test.queue'
}
class UDPListener():
def __init__(self, domain, port, queue_cfg):
# If I initialize socket during init I see strange error:
# on the line: data, addr = sock_inst.recvfrom(int(10000))
# error: [Errno 88] Socket operation on non-socket
# So I put initialization to runListner function
#self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#self.sock.bind((domain, port))
self.domain = domain
self.port = port
self.remote_queue_cfg = queue_cfg
self.queue = Queue()
self.isWorking = Value('b', True)
self.decoder = Decoder()
self.reactor = ParallelQueueReactor(self.queue)
self.stdin_path = '/dev/null'
self.stdout_path = '/dev/tty'
self.stderr_path = '/dev/tty'
self.pidfile_path = '/var/run/testdaemon/testdaemon.pid'
self.pidfile_timeout = 5
def __assignData(self, addr, data):
receive_time = time.time()
messages = self.decoder.decode(receive_time, addr, data)
for msg in messages:
self.reactor.addMessage(msg)
def runListner(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((domain, port))
while self.isWorking.value:
inputready, outputready, exceptready = select([self.sock], [], [])
for sock_inst in inputready:
if sock_inst == self.sock:
data, addr = sock_inst.recvfrom(int(10000))
if data:
self.__assignData(addr[0], data)
self.sock.close()
def runQueueDispatcher(self):
while self.isWorking.value:
connected = False
while not connected:
try:
conn = stomp.Connection(host_and_ports=[(self.remote_queue_cfg['host'], self.remote_queue_cfg['port'])])
conn.start()
conn.connect(self.remote_queue_cfg['user'], self.remote_queue_cfg['password'], wait=True)
connected = True
except socket.error:
log.error('Could not connect to activemq server.')
time.sleep(20)
if connected == True:
while self.isWorking.value:
msg = None
if not self.queue.empty():
#Now error appear hear even when not self.queue.empty()
msg = self.queue.get()
else:
time.sleep(1)
if msg is not None:
try:
data = json.dumps(msg)
conn.send(body=data, destination=self.remote_queue_cfg['queue'])
count += 1
except:
log.error('Failed to send message to queue.')
time.sleep(1)
def stop(self):
self.isWorking.value = False
def run(self):
log.error('StartProcesses')
dispatcher_process = Process(target=self.runQueueDispatcher, name='Dispatcher')
listner_process = Process(target=self.runListner, name='Listner')
dispatcher_process.start()
listner_process.start()
dispatcher_process.join()
listner_process.join()
log.info('Finished')
#------------------------------------------------------------------
def main():
from daemon import runner
app = UDPListener(domain, port, queue_cfg)
daemon_runner = runner.DaemonRunner(app)
daemon_runner.daemon_context.files_preserve=[handler.stream]
daemon_runner.do_action()
if __name__ == "__main__":
main()
现在我在 msg = self.queue.get() 上看到错误
Traceback (most recent call last): File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in
_bootstrap
self.run() File "/usr/lib64/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs) File "/root/ipelevan/dream/src/parallel_main.py", line 116, in runQueueDispatcher
msg = self.queue.get() File "/usr/lib64/python2.6/multiprocessing/queues.py", line 91, in get
res = self._recv() EOFError
我在手动运行 UDPListner.run() 时没有看到这个错误。但是对于 daemon runner,它看起来像是在下面创建了 UDPListner 的新实例,并且在不同的进程中我们有不同的 Queue(以及不同的 self.socket,当它在 init 中初始化时)。
最佳答案
首先:将共享对象(队列、值)的链接保留为类的成员以供进程使用是一个坏主意。它以某种方式在没有妖魔化的情况下起作用。但是,当在 DaemonContext 中运行相同的代码时,os.fork() 发生了,并以某种方式弄乱了对象的链接。我不太确定 Multiprocessing 模块是否设计为在对象的方法内 100% 正确工作。
其次:DaemonContext 有助于从 shell 中分离进程、重定向流并执行与守护进程相关的其他几项操作,但我还没有找到任何好的方法来检查这样的守护进程是否已经在运行。所以我只是用了
if os.path.isfile(pidfile_path):
print 'pidfile %s exists. Already running?' % pidfile_path
sys.exit(1)
关于python - 创建使用 Multiprocessing 和 Multiprocessing.Queues 的 linux 守护进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35293326/
我在网上搜索但没有找到任何合适的文章解释如何使用 javascript 使用 WCF 服务,尤其是 WebScriptEndpoint。 任何人都可以对此给出任何指导吗? 谢谢 最佳答案 这是一篇关于
我正在编写一个将运行 Linux 命令的 C 程序,例如: cat/etc/passwd | grep 列表 |剪切-c 1-5 我没有任何结果 *这里 parent 等待第一个 child (chi
所以我正在尝试处理文件上传,然后将该文件作为二进制文件存储到数据库中。在我存储它之后,我尝试在给定的 URL 上提供文件。我似乎找不到适合这里的方法。我需要使用数据库,因为我使用 Google 应用引
我正在尝试制作一个宏,将下面的公式添加到单元格中,然后将其拖到整个列中并在 H 列中复制相同的公式 我想在 F 和 H 列中输入公式的数据 Range("F1").formula = "=IF(ISE
问题类似于this one ,但我想使用 OperatorPrecedenceParser 解析带有函数应用程序的表达式在 FParsec . 这是我的 AST: type Expression =
我想通过使用 sequelize 和 node.js 将这个查询更改为代码取决于在哪里 select COUNT(gender) as genderCount from customers where
我正在使用GNU bash,版本5.0.3(1)-发行版(x86_64-pc-linux-gnu),我想知道为什么简单的赋值语句会出现语法错误: #/bin/bash var1=/tmp
这里,为什么我的代码在 IE 中不起作用。我的代码适用于所有浏览器。没有问题。但是当我在 IE 上运行我的项目时,它发现错误。 而且我的 jquery 类和 insertadjacentHTMl 也不
我正在尝试更改标签的innerHTML。我无权访问该表单,因此无法编辑 HTML。标签具有的唯一标识符是“for”属性。 这是输入和标签的结构:
我有一个页面,我可以在其中返回用户帖子,可以使用一些 jquery 代码对这些帖子进行即时评论,在发布新评论后,我在帖子下插入新评论以及删除 按钮。问题是 Delete 按钮在新插入的元素上不起作用,
我有一个大约有 20 列的“管道分隔”文件。我只想使用 sha1sum 散列第一列,它是一个数字,如帐号,并按原样返回其余列。 使用 awk 或 sed 执行此操作的最佳方法是什么? Accounti
我需要将以下内容插入到我的表中...我的用户表有五列 id、用户名、密码、名称、条目。 (我还没有提交任何东西到条目中,我稍后会使用 php 来做)但由于某种原因我不断收到这个错误:#1054 - U
所以我试图有一个输入字段,我可以在其中输入任何字符,但然后将输入的值小写,删除任何非字母数字字符,留下“。”而不是空格。 例如,如果我输入: 地球的 70% 是水,-!*#$^^ & 30% 土地 输
我正在尝试做一些我认为非常简单的事情,但出于某种原因我没有得到想要的结果?我是 javascript 的新手,但对 java 有经验,所以我相信我没有使用某种正确的规则。 这是一个获取输入值、检查选择
我想使用 angularjs 从 mysql 数据库加载数据。 这就是应用程序的工作原理;用户登录,他们的用户名存储在 cookie 中。该用户名显示在主页上 我想获取这个值并通过 angularjs
我正在使用 autoLayout,我想在 UITableViewCell 上放置一个 UIlabel,它应该始终位于单元格的右侧和右侧的中心。 这就是我想要实现的目标 所以在这里你可以看到我正在谈论的
我需要与 MySql 等效的 elasticsearch 查询。我的 sql 查询: SELECT DISTINCT t.product_id AS id FROM tbl_sup_price t
我正在实现代码以使用 JSON。 func setup() { if let flickrURL = NSURL(string: "https://api.flickr.com/
我尝试使用for循环声明变量,然后测试cols和rols是否相同。如果是,它将运行递归函数。但是,我在 javascript 中执行 do 时遇到问题。有人可以帮忙吗? 现在,在比较 col.1 和
我举了一个我正在处理的问题的简短示例。 HTML代码: 1 2 3 CSS 代码: .BB a:hover{ color: #000; } .BB > li:after {
我是一名优秀的程序员,十分优秀!