- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我试图在一个管理进程下启动一个数据队列服务器(这样它以后可以变成一个服务),虽然数据队列服务器功能在主进程中工作正常,但它在一个进程中不起作用使用 multiprocessing.Process 创建的进程。
dataQueueServer 和 dataQueueClient 代码基于多处理模块文档中的代码 here .
当单独运行时,dataQueueServer 运行良好。但是,当在 mpqueue 中使用 multiprocessing.Process
的 start()
运行时,它不起作用(在客户端测试时) .我正在使用未更改的 dataQueueClient 来测试这两种情况。
在这两种情况下,代码确实到达了 serve_forever
,所以我认为服务器正在工作,但在 mpqueue 情况下,某些东西阻止它与客户端通信.
我已将运行 serve_forever()
部分的循环置于线程下,以便它可以停止。
代码如下:
mpqueue # 这是试图在子进程中生成服务器的“管理器”进程
import time
import multiprocessing
import threading
import dataQueueServer
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueServer(multiprocessing.Process):
def __init__(self, name = '', printer = None):
multiprocessing.Process.__init__(self)
self.name = name
self.printer = printer
self.ml = dataQueueServer.MainLoop(name = 'ml', printer = self.printer)
def run(self):
self.printer.tsprint(self.ml)
self.ml.start()
def stop(self):
self.ml.stop()
if __name__ == '__main__':
printer = Printer()
qs = QueueServer(name = 'QueueServer', printer = printer)
printer.tsprint(qs)
printer.tsprint('starting')
qs.start()
printer.tsprint('started.')
printer.tsprint('Press Ctrl-C to quit')
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
printer.tsprint('\nTrying to exit cleanly...')
qs.stop()
printer.tsprint('stopped')
数据队列服务器
import time
import threading
from multiprocessing.managers import BaseManager
from multiprocessing import Queue
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
## Define some helper functions for use by the main process loop
class Printer():
def __init__(self):
self.lock = threading.Lock()
def tsprint(self, text):
with self.lock:
print text
class QueueManager(BaseManager):
pass
class MainLoop(threading.Thread):
"""A thread based loop manager, allowing termination signals to be sent
to the thread"""
def __init__(self, name = '', printer = None):
threading.Thread.__init__(self)
self._stopEvent = threading.Event()
self.daemon = True
self.name = name
if printer is None:
self.printer = Printer()
else:
self.printer = printer
## create the queue
self.queue = Queue()
## Add a function to the handler to return the queue to clients
self.QM = QueueManager
self.QM.register('get_queue', callable=lambda:self.queue)
self.queue_manager = self.QM(address=(HOST, PORT), authkey=AUTHKEY)
self.queue_server = self.queue_manager.get_server()
def __del__(self):
self.printer.tsprint( 'closing...')
def run(self):
self.printer.tsprint( '{}: started serving'.format(self.name))
self.queue_server.serve_forever()
def stop(self):
self.printer.tsprint ('{}: stopping'.format(self.name))
self._stopEvent.set()
def stopped(self):
return self._stopEvent.isSet()
def start():
printer = Printer()
ml = MainLoop(name = 'ml', printer = printer)
ml.start()
return ml
def stop(ml):
ml.stop()
if __name__ == '__main__':
ml = start()
raw_input("\nhit return to stop")
stop(ml)
还有一个客户:
数据队列客户端
import datetime
from multiprocessing.managers import BaseManager
n = 0
N = 10**n
HOST = ''
PORT = 50010
AUTHKEY = 'authkey'
def now():
return datetime.datetime.now()
def gen(n, func, *args, **kwargs):
k = 0
while k < n:
yield func(*args, **kwargs)
k += 1
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
m = QueueManager(address=(HOST, PORT), authkey=AUTHKEY)
m.connect()
queue = m.get_queue()
def load(msg, q):
return q.put(msg)
def get(q):
return q.get()
lgen = gen(N, load, msg = 'hello', q = queue)
t0 = now()
while True:
try:
lgen.next()
except StopIteration:
break
t1 = now()
print 'loaded %d items in ' % N, t1-t0
t0 = now()
while queue.qsize() > 0:
queue.get()
t1 = now()
print 'got %d items in ' % N, t1-t0
最佳答案
所以看起来解决方案很简单:不要使用 serve_forever()
,而是使用 manager.start()
。
根据 Eli Bendersky ,BaseManager
(及其扩展版本 SyncManager
)已经在新进程中生成服务器(查看 multiprocessing.managers 代码证实了这一点)。我一直遇到的问题源于示例中使用的表单,其中服务器在主进程下启动。
我仍然不明白为什么当前示例在子进程下运行时不起作用,但这不再是问题。
这是管理多个队列服务器的工作代码(从 OP 大大简化):
服务器:
from multiprocessing import Queue
from multiprocessing.managers import SyncManager
HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'
name0 = 'qm0'
name1 = 'qm1'
name2 = 'qm2'
description = 'Queue Server'
def CreateQueueServer(HOST, PORT, AUTHKEY, name = None, description = None):
name = name
description = description
q = Queue()
class QueueManager(SyncManager):
pass
QueueManager.register('get_queue', callable = lambda: q)
QueueManager.register('get_name', callable = name)
QueueManager.register('get_description', callable = description)
manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
manager.start() # This actually starts the server
return manager
# Start three queue servers
qm0 = CreateQueueServer(HOST, PORT0, AUTHKEY, name0, description)
qm1 = CreateQueueServer(HOST, PORT1, AUTHKEY, name1, description)
qm2 = CreateQueueServer(HOST, PORT2, AUTHKEY, name2, description)
raw_input("return to end")
客户:
from multiprocessing.managers import SyncManager
HOST = ''
PORT0 = 5011
PORT1 = 5012
PORT2 = 5013
AUTHKEY = 'authkey'
def QueueServerClient(HOST, PORT, AUTHKEY):
class QueueManager(SyncManager):
pass
QueueManager.register('get_queue')
QueueManager.register('get_name')
QueueManager.register('get_description')
manager = QueueManager(address = (HOST, PORT), authkey = AUTHKEY)
manager.connect() # This starts the connected client
return manager
# create three connected managers
qc0 = QueueServerClient(HOST, PORT0, AUTHKEY)
qc1 = QueueServerClient(HOST, PORT1, AUTHKEY)
qc2 = QueueServerClient(HOST, PORT2, AUTHKEY)
# Get the queue objects from the clients
q0 = qc0.get_queue()
q1 = qc1.get_queue()
q2 = qc2.get_queue()
# put stuff in the queues
q0.put('some stuff')
q1.put('other stuff')
q2.put({1:123, 2:'abc'})
# check their sizes
print 'q0 size', q0.qsize()
print 'q1 size', q1.qsize()
print 'q2 size', q2.qsize()
# pull some stuff and print it
print q0.get()
print q1.get()
print q2.get()
添加一个额外的服务器来与正在运行的队列服务器的信息共享一个字典,这样消费者就可以很容易地分辨出可用的地方,使用该模型就足够容易了。不过,需要注意的一件事是,共享字典需要的语法与普通字典略有不同:dictionary[0] = something
将不起作用。您需要使用 dictionary.update([(key, value), (otherkey, othervalue)])
和 dictionary.get(key)
语法,它传播到所有连接到该字典的其他客户端..
关于Python multiprocessing RemoteManager 下的一个 multiprocessing.Process,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11532654/
我正在尝试使用多处理和队列实现生产者-消费者场景;主进程是生产者,两个子进程使用队列中的数据。这在没有任何异常 发生的情况下有效,但问题是我希望能够在工作人员死亡时重新启动他们(kill -9 wor
我试图在一个管理进程下启动一个数据队列服务器(这样它以后可以变成一个服务),虽然数据队列服务器功能在主进程中工作正常,但它在一个进程中不起作用使用 multiprocessing.Process 创建
我的多处理需求非常简单:我从事机器学习工作,有时我需要评估多个数据集中的一个算法,或者一个数据集中的多个算法,等等。我只需要运行一个带有一些参数的函数并获取一个数字。 我不需要 RPC、共享数据,什么
创建进程池或简单地遍历一个进程以创建更多进程之间有任何区别(以任何方式)吗? 这有什么区别?: pool = multiprocessing.Pool(5) pool.apply_async(work
multiprocessing.BoundedSemaphore(3) 与 multiprocessing.Sempahore(3) 有何不同? 我希望 multiprocessing.Bounded
我尝试通过 multiprocessing 包中的 Queue 对 Pipe 的速度进行基准测试。我认为 Pipe 会更快,因为 Queue 在内部使用 Pipe。 奇怪的是,Pipe 在发送大型 n
我有这样一个简单的任务: def worker(queue): while True: try: _ = queue.get_nowait()
我正在尝试编写一个与 multiprocessing.Pool 同时应用函数的应用程序。我希望这个函数成为一个实例方法(所以我可以在不同的子类中以不同的方式定义它)。这似乎是不可能的;正如我在其他地方
在 python 2 中,multiprocessing.dummy.Pool 和 multiprocessing.pool.ThreadPool 之间有什么区别吗?源代码似乎暗示它们是相同的。 最佳
我正在开发一个用于财务目的的模型。我将整个 S&P500 组件放在一个文件夹中,存储了尽可能多的 .hdf 文件。每个 .hdf 文件都有自己的多索引(年-周-分)。 顺序代码示例(非并行化): im
到目前为止,我是这样做的: rets=set(pool.map_async(my_callback, args.hosts).get(60*4)) 如果超时,我会得到一个异常: File "/usr
参见下面的示例和执行结果: #!/usr/bin/env python3.4 from multiprocessing import Pool import time import os def in
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。 解码和发
我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失
multiprocessing.Queue 的文档指出从项目入队到其腌制表示刷新到底层管道之间存在一点延迟。显然,您可以将一个项目直接放入管道中(它没有说明其他情况,并且暗示情况就是如此)。 为什么管
我运行了一些测试代码来检查在 Linux 中使用 Pool 和 Process 的性能。我正在使用 Python 2.7。 multiprocessing.Pool 的源代码似乎显示它正在使用 mul
我在 Windows Standard Embedded 7 上运行 python 3.4.3。我有一个继承 multiprocessing.Process 的类。 在类的 run 方法中,我为进程对
我知道multiprocessing.Process类似于 threading.Thread当我子类 multiprocessing.Process 时要创建一个进程,我发现我不必调用 __init_
我有教科书声明说在多处理器系统中不建议禁用中断,并且会花费太多时间。但我不明白这一点,谁能告诉我多处理器系统禁用中断的过程?谢谢 最佳答案 在 x86(和其他架构,AFAIK)上,启用/禁用中断是基于
我正在执行下面的代码并且它工作正常,但它不会产生不同的进程,而是有时所有都在同一个进程中运行,有时 2 个在一个进程中运行。我正在使用 4 cpu 机器。这段代码有什么问题? def f(values
我是一名优秀的程序员,十分优秀!