gpt4 book ai didi

Python Multiprocessing - 进程数

转载 作者:行者123 更新时间:2023-12-05 02:23:01 26 4
gpt4 key购买 nike

我正在执行下面的代码并且它工作正常,但它不会产生不同的进程,而是有时所有都在同一个进程中运行,有时 2 个在一个进程中运行。我正在使用 4 cpu 机器。这段代码有什么问题?

def f(values):
print(multiprocessing.current_process())
return values

def main():
p = Pool(4) #number of processes = number of CPUs
keys, values= zip(*data.items()) #ordered keys and values
processed_values= p.map( f, values )
result= dict( zip(keys, processed_values ) )
p.close() # no more tasks
p.join() # wrap up current tasks

结果是

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

有时像这样,

<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>

有时候,

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

我的问题是,它将职能分配给 worker 的依据是什么?我正在编写代码,它根据字典中的键数决定进程数(考虑到我的数据总是比我的 CPU 具有更少的键)。我的代码将开始 - 主代码读取文件并使用单个进程从中创建字典,并将其分支到并发进程的数量并等待它们处理数据(我为此使用 pool.map),然后一旦它得到子进程的结果,它就开始处理它们。我怎样才能实现这个父进程等待子进程步骤?

最佳答案

您的代码没有任何问题。您的工作项目非常快 - 如此之快,以至于同一个工作进程可以运行该函数,返回结果,然后赢得竞争,从 multiprocessing.Pool< 的内部队列中消耗下一个任务 用于分配工作。当您调用 map 时,工作项被分成批处理并放入 Queue 中。以下是 pool.map 实现的一部分,它将 iterable 中的项目分 block 并将它们放入队列中:

    task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), None))

每个工作进程运行一个函数,该函数具有一个无限循环,该循环消耗该队列中的项目*:

while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get() # Pulls an item off the taskqueue
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break

if task is None:
debug('worker got sentinel -- exiting')
break

job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds)) # Runs the function you passed to map
except Exception, e:
result = (False, e)
try:
put((job, i, result)) # Sends the result back to the parent
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
debug("Possible encoding error while sending result: %s" % (
wrapped))

很可能是同一个 worker 碰巧能够消费一个项目,运行 func,然后消费下一个项目。这有点奇怪 - 我无法在我的机器上重现它运行与你的示例相同的代码 - 但是让同一个工作人员从队列中抓取四个项目中的两个是很正常的。

如果您通过插入对 time.sleep 的调用使您的 worker 函数花费更长的时间,您应该始终看到均匀分布:

def f(values):
print(multiprocessing.current_process())
time.sleep(1)
return values

* 这实际上不完全正确 - 在主进程中运行的线程从 taskqueue 消费,然后将它拉出的内容放入另一个 Queue这就是子进程消费的内容)

关于Python Multiprocessing - 进程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25902373/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com