gpt4 book ai didi

python 多处理问题

转载 作者:行者123 更新时间:2023-12-01 04:44:57 24 4
gpt4 key购买 nike

我在使用进程和队列时遇到了一些问题。

当我运行以下代码时,目标函数只是从主队列中获取一个项目,并将其添加到特定于该进程的另一个队列中。

import sys
import multiprocessing
from Queue import Empty

# This is just taking a number from the queue
# and adding it to another queue
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break

# Create a master queue and fill it with numbers
main_queue = multiprocessing.Queue()
for i in xrange(100):
main_queue.put(i)

all_queues = []
processes = []
# Create processes
for i in xrange(5):
# Each process gets a queue that it will put numbers into
queue = multiprocessing.Queue()
# Keep up with the queue we are creating so we can get it later
all_queues.append(queue)
# Pass in our master queue and the queue we are transferring data to
process = multiprocessing.Process(target=my_callable,
args=(main_queue, queue))
# Keep up with the processes
processes.append(process)

for thread in processes:
thread.start()

for thread in processes:
thread.join()

当目标函数打印正在使用的队列时,您会注意到几乎独占了一个队列。

如果您随后获取输出并打印它,您将看到大多数数字最终都位于单个队列中。

def queue_get_all(q):
items = []
maxItemsToRetreive = 100
for numOfItemsRetrieved in range(0, maxItemsToRetreive):
try:
if numOfItemsRetrieved == maxItemsToRetreive:
break
items.append(q.get_nowait())
except Empty, e:
break
return items

for tmp in all_queues:
print queue_get_all(tmp)

这是什么原因造成的?我的代码中是否应该做一些事情来平衡这些进程正在做的工作?

输出

[0, 2, 3, 4, 5, 6, 7, 8]
[1, 9, 10]
[11, 14, 15, 16]
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[13]

最佳答案

我认为你有两个问题:

def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break

来自 get 的文档:

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the Queue.Empty exception (timeout is ignored in that case).

由于您将 0 作为第一个参数传递,因此它相当于 get(False)。这使得它成为非阻塞的,这意味着如果它无法立即获取值,它将引发一个 Empty 异常,这将结束您的工作进程。由于所有“工作”函数都是相同的,并且尝试同时从主队列中提取数据,因此有些函数可能无法立即获取值并会死掉。

.get()一个小的超时应该可以解决这个问题。

第二个问题是你的“工作”功能基本上需要零时间才能完成。使用 sleep(.2) 稍微暂停一下以模拟一些不平凡的工作,它将在工作人员之间分配:

def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(True, .1)
sleep(0.2)
to_queue.put(tmp)
except Empty:
break

编辑:

我忘了说,一般来说,这类问题最好不要依赖 .get() 的超时来表示队列结束。如果您使用某种类型的“队列结束”标记对象,并将其传递到队列中,告诉工作人员是时候退出了,那么您将获得更多控制权。这样你就可以让它们全部阻塞,等待新的输入或退出“命令”。

关于python 多处理问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29681261/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com