- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想创建一个在 While True 循环中永远运行的线程或进程。
我需要以队列的形式向工作人员发送和接收数据,可以是 multiprocessing.Queue() 或 collections.deque()。我更喜欢使用 collections.deque(),因为它明显更快。
我还需要能够最终杀死工作人员(因为它在 while True 循环中运行。这是我放在一起的一些测试代码,用于尝试理解 Threads、Processes、Queues 和 deque 之间的区别。 .
import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque
class ThreadingTest(Thread):
def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False
def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))
def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()
if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))
如您所见,t1 可以是 ThreadingTest 或 ProcessTest。此外,传递给它的队列可以是 multiprocessing.Queue 或 collections.deque。
ThreadingTest 使用 Queue 或 deque()。它还会在调用 stop() 方法时正确终止 run()。
Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])
ProcessTest 只能从 multiprocessing.Queue 类型的队列中读取。它不适用于 collections.deque。此外,我无法使用 stop() 终止进程。
Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process
我想弄清楚为什么?另外,将 deque 与进程一起使用的最佳方法是什么?而且,我将如何使用某种 stop() 方法终止进程。
最佳答案
您不能使用 collections.deque
在两个 multiprocessing.Process
实例之间传递数据,因为 collections.deque
不是进程-意识到的。 multiprocessing.Queue
在内部将其内容写入 multiprocessing.Pipe
,这意味着其中的数据可以在一个进程中入队并在另一个进程中检索。 collections.deque
没有那种管道,所以它不会工作。当您在一个进程中写入deque
时,另一个进程中的deque
实例不会受到任何影响;它们是完全独立的实例。
您的 stop()
方法也发生了类似的问题。您正在主进程中更改 toRun
的值,但这根本不会影响子进程。它们是完全独立的实例。结束 child 的最好方法是向 Queue
发送一些哨兵。当拿到child中的sentinel时,跳出死循环:
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
if i is None:
break # Got sentinel, so break
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.put(None) # Send sentinel
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
编辑:
如果你真的需要两个进程之间的deque
语义,你可以使用custom multiprocessing.Manager()
在 Manager
进程中创建一个共享的 deque
,并且您的每个 Process
实例都将获得一个 Proxy
:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque
SyncManager.register('deque', deque)
def Manager():
m = SyncManager()
m.start()
return m
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if self.q._getvalue():
i = self.q.popleft()
if i is None:
break
print("Process deque: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.append(None)
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
m = Manager()
q = m.deque()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
q.append(i)
time.sleep(1)
t1.stop()
t1.join()
print(q)
请注意,这可能不会比 multiprocessing.Queue
快,因为每次访问 deque
都会产生 IPC 成本。对于按照您的方式传递消息,它也是一种不太自然的数据结构。
关于python - 关于使用 Queue()/deque() 和类变量进行通信和 "poison pill"的进程与线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27345332/
这个问题在这里已经有了答案: Is using std::deque or std::priority_queue thread-safe? [duplicate] (3 个答案) Thread s
我是一名学生,我的操作系统类(class)项目有一点问题,这对于作业规范本身来说有点多余: 虽然我可以将 100 万个双端队列推送到我的双端队列中,但我无法推送约 1000 万或更多。 现在,在实际的
std::deque 在 CppReference 中有很好的记录,但是 boost::deque 的 documentation看起来和标准的一样,只是增加了一些方法,比如nth和index_of。
Scala 是否具有类似于 Java Deque 或 Python deque 的双面队列? 我在 Scala 2.12 API 中只看到 Stack 和 Queue 但只想仔细检查一下。 最佳答案
好的,我正在实现一个与 Apple 的日历应用程序非常相似的日历。我的 UICollectionView 中有多种类型的单元格。我有垂直线单元格、水平线单元格、现在线单元格和事件单元格。 每当我滚动时
目前在我的项目中我有两个静态方法PushObjects 和ProcessObject。 PushObject 方法将数据推回静态双端队列,此方法可由多个线程访问,但 ProcessObject 始终由
使用 http://www.cppreference.com/wiki/stl/deque/insert作为引用,我在某些位置将值插入双端队列。 例如,如果双端队列 A 是: a, b, d, e,
Python 的 collections.deque有一个 maxlen 参数,这样 [...] the deque is bounded to the specified maximum lengt
这是我的第一个问题,希望大家都好。我必须编写双端队列或双端队列的数组实现,但在理解前面方法的入队元素时遇到了一些麻烦,我通过摆弄了一下让它工作,但我仍然很难理解逻辑: void addAtFront(
我试图解决 Java Deque 上的 HackerRank 问题。除了具有 100,000 个输入的案例之外,我的代码通过了所有案例。 问题:在这个问题中,给你 N 个整数。您需要在大小为 M 的所
我正在编写 Deque 以便能够从 Front 和 Rear 中添加和删除....我认为我的逻辑是错误的,但我无法弄清楚!因为当我从前面插入时,它必须从后面移除,但也从前面移除。 你会检查我的代码并帮
我在使用 deque 容器时有些困惑。 我将 vector 与 deque 进行了比较,我动态地输入了 Integer 值并观察到在几次插入之后 vector 的对象开始四处移动并且地址已被更改,这似
由于我对内存有严格的要求,因此在从双端队列的开头删除一个范围后,我尝试使用deque::shrink_to_fit。但是,它不起作用,我只看到libstdc++使用带有副本的swap技巧实现了shri
我需要为继承到 deque 的类添加功能,但更喜欢看到 collections.deque 中的代码来实现一个新类 final。 >>> from _collections import deque,
本文实例讲述了python3 deque 双向队列创建与使用方法。分享给大家供大家参考,具体如下: 创建双向队列 ?
考虑以下 C++ 程序: #include #include using namespace std; int main() { deque d(30000000); cout
尝试使用 Deque 数据结构来回答一个编程问题,以查找乘积小于目标的所有子数组。 如上所述,我想使用 Deque 数据结构。我查看了用法,认为我做对了,但是使用了 const Deque = req
双端队列实现我实现了一个通用的 Deque 数据结构。 请检查此实现 这个错误对我来说没有意义,请告诉我一点信息 import java.util.NoSuchElementException; im
我需要编写自己的 Deque 类,并且必须使用双向链表实现来存储数据。问题是编写方法pushfromLeft(Thing thing),它将插入到双端队列的左侧。以下是我迄今为止所拥有的,但似乎不起作
标准说: A deque is a sequence container that supports random access iterators (27.2.7). In addition, it
我是一名优秀的程序员,十分优秀!