gpt4 book ai didi

python - 关于使用 Queue()/deque() 和类变量进行通信和 "poison pill"的进程与线程

转载 作者:太空宇宙 更新时间:2023-11-03 12:59:35 25 4
gpt4 key购买 nike

我想创建一个在 While True 循环中永远运行的线程或进程。

我需要以队列的形式向工作人员发送和接收数据,可以是 multiprocessing.Queue() 或 collections.deque()。我更喜欢使用 collections.deque(),因为它明显更快。

我还需要能够最终杀死工作人员(因为它在 while True 循环中运行。这是我放在一起的一些测试代码,用于尝试理解 Threads、Processes、Queues 和 deque 之间的区别。 .

import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque

class ThreadingTest(Thread):

def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False

def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))

def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")

class ProcessTest(Process):

def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0

def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))

def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")

if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()

for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()

if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))

如您所见,t1 可以是 ThreadingTest 或 ProcessTest。此外,传递给它的队列可以是 multiprocessing.Queue 或 collections.deque。

ThreadingTest 使用 Queue 或 deque()。它还会在调用 stop() 方法时正确终止 run()。

Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])

ProcessTest 只能从 multiprocessing.Queue 类型的队列中读取。它不适用于 collections.deque。此外,我无法使用 stop() 终止进程。

Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process

我想弄清楚为什么?另外,将 deque 与进程一起使用的最佳方法是什么?而且,我将如何使用某种 stop() 方法终止进程。

最佳答案

您不能使用 collections.deque 在两个 multiprocessing.Process 实例之间传递数据,因为 collections.deque 不是进程-意识到的。 multiprocessing.Queue 在内部将其内容写入 multiprocessing.Pipe,这意味着其中的数据可以在一个进程中入队并在另一个进程中检索。 collections.deque 没有那种管道,所以它不会工作。当您在一个进程中写入deque 时,另一个进程中的deque 实例不会受到任何影响;它们是完全独立的实例。

您的 stop() 方法也发生了类似的问题。您正在主进程中更改 toRun 的值,但这根本不会影响子进程。它们是完全独立的实例。结束 child 的最好方法是向 Queue 发送一些哨兵。当拿到child中的sentinel时,跳出死循环:

def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
if i is None:
break # Got sentinel, so break
print("Process Queue: " + str(i))

def stop(self):
print("Trying to stop Process")
self.q.put(None) # Send sentinel
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")

编辑:

如果你真的需要两个进程之间的deque语义,你可以使用custom multiprocessing.Manager()Manager 进程中创建一个共享的 deque,并且您的每个 Process 实例都将获得一个 Proxy :

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque

SyncManager.register('deque', deque)

def Manager():
m = SyncManager()
m.start()
return m

class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.ctr = 0

def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if self.q._getvalue():
i = self.q.popleft()
if i is None:
break
print("Process deque: " + str(i))

def stop(self):
print("Trying to stop Process")
self.q.append(None)
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")

if __name__ == '__main__':
m = Manager()
q = m.deque()
t1 = ProcessTest(q)
t1.start()

for i in range(10):
q.append(i)
time.sleep(1)
t1.stop()
t1.join()

print(q)

请注意,这可能不会比 multiprocessing.Queue 快,因为每次访问 deque 都会产生 IPC 成本。对于按照您的方式传递消息,它也是一种不太自然的数据结构。

关于python - 关于使用 Queue()/deque() 和类变量进行通信和 "poison pill"的进程与线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27345332/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com