- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。
解码和发送到远程都可能很耗时。为了使程序更具可扩展性,我们创建了两个进程(Multiprocessing.Process):
现在我需要从它创建一个合适的 linux 守护进程(可以通过 service 命令启动、停止和重新启动)。
问题:如何从 python 多处理程序创建守护进程。我没有找到关于此的指南。有没有人知道如何做到这一点,或者有工作示例。
以下文字是我为实现这一目标所做的努力:我找到了 python 守护进程的小例子:http://www.gavinj.net/2012/06/building-python-daemon-process.html所以我重写了我的代码(抱歉代码太长):
import socket
import time
import os
from select import select
import multiprocessing
from multiprocessing import Process, Queue, Value
import stomp
import json
import logging
logger = logging.getLogger("DaemonLog")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.FileHandler("/var/log/testdaemon/testdaemon.log")
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logger
#Config listner
domain = 'example.host.ru'
port = int(9930)
#Config remote queue access
queue_cfg = {
'host': 'queue.test.ru',
'port': 61113,
'user': 'user',
'password': 'pass',
'queue': '/topic/test.queue'
}
class UDPListener():
def __init__(self, domain, port, queue_cfg):
# If I initialize socket during init I see strange error:
# on the line: data, addr = sock_inst.recvfrom(int(10000))
# error: [Errno 88] Socket operation on non-socket
# So I put initialization to runListner function
#self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#self.sock.bind((domain, port))
self.domain = domain
self.port = port
self.remote_queue_cfg = queue_cfg
self.queue = Queue()
self.isWorking = Value('b', True)
self.decoder = Decoder()
self.reactor = ParallelQueueReactor(self.queue)
self.stdin_path = '/dev/null'
self.stdout_path = '/dev/tty'
self.stderr_path = '/dev/tty'
self.pidfile_path = '/var/run/testdaemon/testdaemon.pid'
self.pidfile_timeout = 5
def __assignData(self, addr, data):
receive_time = time.time()
messages = self.decoder.decode(receive_time, addr, data)
for msg in messages:
self.reactor.addMessage(msg)
def runListner(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((domain, port))
while self.isWorking.value:
inputready, outputready, exceptready = select([self.sock], [], [])
for sock_inst in inputready:
if sock_inst == self.sock:
data, addr = sock_inst.recvfrom(int(10000))
if data:
self.__assignData(addr[0], data)
self.sock.close()
def runQueueDispatcher(self):
while self.isWorking.value:
connected = False
while not connected:
try:
conn = stomp.Connection(host_and_ports=[(self.remote_queue_cfg['host'], self.remote_queue_cfg['port'])])
conn.start()
conn.connect(self.remote_queue_cfg['user'], self.remote_queue_cfg['password'], wait=True)
connected = True
except socket.error:
log.error('Could not connect to activemq server.')
time.sleep(20)
if connected == True:
while self.isWorking.value:
msg = None
if not self.queue.empty():
#Now error appear hear even when not self.queue.empty()
msg = self.queue.get()
else:
time.sleep(1)
if msg is not None:
try:
data = json.dumps(msg)
conn.send(body=data, destination=self.remote_queue_cfg['queue'])
count += 1
except:
log.error('Failed to send message to queue.')
time.sleep(1)
def stop(self):
self.isWorking.value = False
def run(self):
log.error('StartProcesses')
dispatcher_process = Process(target=self.runQueueDispatcher, name='Dispatcher')
listner_process = Process(target=self.runListner, name='Listner')
dispatcher_process.start()
listner_process.start()
dispatcher_process.join()
listner_process.join()
log.info('Finished')
#------------------------------------------------------------------
def main():
from daemon import runner
app = UDPListener(domain, port, queue_cfg)
daemon_runner = runner.DaemonRunner(app)
daemon_runner.daemon_context.files_preserve=[handler.stream]
daemon_runner.do_action()
if __name__ == "__main__":
main()
现在我在 msg = self.queue.get() 上看到错误
Traceback (most recent call last): File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in
_bootstrap
self.run() File "/usr/lib64/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs) File "/root/ipelevan/dream/src/parallel_main.py", line 116, in runQueueDispatcher
msg = self.queue.get() File "/usr/lib64/python2.6/multiprocessing/queues.py", line 91, in get
res = self._recv() EOFError
我在手动运行 UDPListner.run() 时没有看到这个错误。但是对于 daemon runner,它看起来像是在下面创建了 UDPListner 的新实例,并且在不同的进程中我们有不同的 Queue(以及不同的 self.socket,当它在 init 中初始化时)。
最佳答案
首先:将共享对象(队列、值)的链接保留为类的成员以供进程使用是一个坏主意。它以某种方式在没有妖魔化的情况下起作用。但是,当在 DaemonContext 中运行相同的代码时,os.fork() 发生了,并以某种方式弄乱了对象的链接。我不太确定 Multiprocessing 模块是否设计为在对象的方法内 100% 正确工作。
其次:DaemonContext 有助于从 shell 中分离进程、重定向流并执行与守护进程相关的其他几项操作,但我还没有找到任何好的方法来检查这样的守护进程是否已经在运行。所以我只是用了
if os.path.isfile(pidfile_path):
print 'pidfile %s exists. Already running?' % pidfile_path
sys.exit(1)
关于python - 创建使用 Multiprocessing 和 Multiprocessing.Queues 的 linux 守护进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35293326/
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,
Linux 管道可以缓冲多少数据?这是可配置的吗? 如果管道的两端在同一个进程中,但线程不同,这会有什么不同吗? 请注意:这个“同一个进程,两个线程”的问题是理论上的边栏,真正的问题是关于缓冲的。 最
我找到了here [最后一页] 一种有趣的通过 Linux 启动 Linux 的方法。不幸的是,它只是被提及,我在网上找不到任何有用的链接。那么有人听说过一种避免引导加载程序而使用 Linux 的方法
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
我试图了解 ld-linux.so 如何在 Linux 上解析对版本化符号的引用。我有以下文件: 测试.c: void f(); int main() { f(); } a.c 和 b.c:
与 RetroPie 的工作原理类似,我可以使用 Linux 应用程序作为我的桌面环境吗?我实际上并不需要像实际桌面和安装应用程序这样的东西。我只需要一种干净简单的方法来在 RaspberryPi 上
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎不是关于 a specific programming problem, a softwar
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 10 年前。 Improve thi
有什么方法可以覆盖现有的源代码,我应该用 PyQt、PyGTK、Java 等从头开始构建吗? 最佳答案 如果您指的是软件本身而不是它所连接的存储库,那么自定义应用程序的方法就是 fork 项目。据我所
我的情况是:我在一个磁盘上安装了两个 linux。我将第一个安装在/dev/sda1 中,然后在/dev/sda2 中安装第二个然后我运行第一个系统,我写了一个脚本来在第一个系统运行时更新它。
我在 i2c-0 总线上使用地址为 0x3f 的系统监视器设备。该设备在设备树中配置有 pmbus 驱动程序。 问题是,加载 linux 内核时,这个“Sysmon”设备没有供电。因此,当我在总线 0
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 11 年前。 Improve thi
我正试图在 linux 模块中分配一大块内存,而 kalloc 做不到。 我知道唯一的方法是使用 alloc_bootmem(unsigned long size) 但我只能从 linux 内核而不是
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎不是关于 a specific programming problem, a softwar
我有 .sh 文件来运行应用程序。在该文件中,我想动态设置服务器名称,而不是每次都配置。 我尝试了以下方法,它在 CentOS 中运行良好。 nohup /voip/java/jdk1.8.0_71/
我是在 Linux 上开发嵌入式 C++ 程序的新手。我有我的 Debian 操作系统,我在其中开发和编译了我的 C++ 项目(一个简单的控制台进程)。 我想将我的应用程序放到另一个 Debian 操
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 4 年前。 Improve this ques
我使用4.19.78版本的稳定内核,我想找到带有企鹅二进制数据的C数组。系统启动时显示。我需要在哪里搜索该内容? 我在 include/linux/linux_logo.h 文件中只找到了一些 Log
我知道可以使用 gdb 的服务器模式远程调试代码,我知道可以调试针对另一种架构交叉编译的代码,但是是否可以更进一步,从远程调试 Linux 应用程序OS X 使用 gdbserver? 最佳答案 当然
是否有任何可能的方法来运行在另一个 Linux 上编译的二进制文件?我知道当然最简单的是在另一台机器上重建它,但假设我们唯一能得到的是一个二进制文件,那么这可能与否? (我知道这可能并不容易,但我只是
我是一名优秀的程序员,十分优秀!