gpt4 book ai didi

函数中的 Python 多处理 - 每次函数调用都会增加内存使用量

转载 作者:行者123 更新时间:2023-11-28 19:23:39 24 4
gpt4 key购买 nike

我正在并行处理大量数据,第一次就可以正常工作。但是,当我尝试将我的程序包装在一个函数中并使用不同的参数多次调用它时(例如,只应处理特定年份),内存首先翻倍然后翻三倍,依此类推,直到我的 PC 内存耗尽。

我不确定发生了什么,但是当我运行以下我正在做的最小示例时,我得到了多处理记录器的奇怪输出。基本上,如果我调用 calc() 函数 n 次,记录器会显示每个输出 n 次。

import multiprocessing
import time
import logging

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return


class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)


def calc():

multiprocessing.log_to_stderr(logging.DEBUG)
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

# Start consumers
num_consumers = 1
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
w.start()

# Enqueue jobs
num_jobs = 3
for i in xrange(num_jobs):
tasks.put(Task(i, i))

# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)

# Wait for all of the tasks to finish
tasks.join()

# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1


if __name__ == '__main__':
calc()
print '--------------------------------------------'
print 'RUNNING SECOND TIME ALL CALLS ARE DUPLICATED'
print '--------------------------------------------'
calc()

记录器输出为:

[DEBUG/MainProcess] created semlock with handle 140730532954112
[DEBUG/MainProcess] created semlock with handle 140730532921344
[DEBUG/MainProcess] created semlock with handle 140730532888576
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] created semlock with handle 140730532855808
[DEBUG/MainProcess] created semlock with handle 140730532823040
[DEBUG/MainProcess] created semlock with handle 140730532790272
[DEBUG/MainProcess] created semlock with handle 140730532757504
[DEBUG/MainProcess] created semlock with handle 140730532724736
[DEBUG/MainProcess] created semlock with handle 140730494124032
[DEBUG/MainProcess] created semlock with handle 140730494091264
[DEBUG/MainProcess] created semlock with handle 140730494058496
[DEBUG/MainProcess] Queue._after_fork()
Creating 1 consumers
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/MainProcess] doing self._thread.start()
[DEBUG/Consumer-1] Queue._after_fork()
[DEBUG/Consumer-1] Queue._after_fork()
[INFO/Consumer-1] child process calling self.run()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
Consumer-1: 0 * 0
[DEBUG/Consumer-1] Queue._start_thread()
[DEBUG/Consumer-1] doing self._thread.start()
[DEBUG/Consumer-1] starting thread to feed data to pipe
[DEBUG/Consumer-1] ... done self._thread.start()
Consumer-1: 1 * 1
Consumer-1: 2 * 2
Consumer-1: Exiting
[INFO/Consumer-1] process shutting down
[DEBUG/Consumer-1] running all "atexit" finalizers with priority >= 0
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
--------------------------------------------
RUNNING SECOND TIME ALL CALLS ARE DUPLICATED
--------------------------------------------
[DEBUG/Consumer-1] telling queue thread to quit
[DEBUG/Consumer-1] running the remaining "atexit" finalizers
[DEBUG/Consumer-1] joining queue thread
[DEBUG/Consumer-1] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] created semlock with handle 140730485637120
[DEBUG/MainProcess] created semlock with handle 140730485637120
[DEBUG/MainProcess] created semlock with handle 140730485604352
[DEBUG/MainProcess] created semlock with handle 140730485604352
[DEBUG/Consumer-1] ... queue thread joined
[DEBUG/MainProcess] created semlock with handle 140730485571584
[DEBUG/MainProcess] created semlock with handle 140730485571584
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._after_fork()
[INFO/Consumer-1] process exiting with exitcode 0
[DEBUG/MainProcess] created semlock with handle 140730485538816
[DEBUG/MainProcess] created semlock with handle 140730485538816
[DEBUG/MainProcess] created semlock with handle 140730485506048
[DEBUG/MainProcess] created semlock with handle 140730485506048
[DEBUG/MainProcess] created semlock with handle 140730485473280
[DEBUG/MainProcess] created semlock with handle 140730485473280
[DEBUG/MainProcess] created semlock with handle 140730485440512
[DEBUG/MainProcess] created semlock with handle 140730485440512
[DEBUG/MainProcess] created semlock with handle 140730485407744
[DEBUG/MainProcess] created semlock with handle 140730485407744
[DEBUG/MainProcess] created semlock with handle 140730485374976
[DEBUG/MainProcess] created semlock with handle 140730485374976
[DEBUG/MainProcess] created semlock with handle 140730485342208
[DEBUG/MainProcess] created semlock with handle 140730485342208
[DEBUG/MainProcess] created semlock with handle 140730485309440
[DEBUG/MainProcess] created semlock with handle 140730485309440
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._after_fork()
Creating 1 consumers
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Consumer-2] child process calling self.run()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Consumer-2] child process calling self.run()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/MainProcess] ... done self._thread.start()
Consumer-2: 0 * 0
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] ... queue thread already dead
[DEBUG/MainProcess] ... queue thread already dead
[DEBUG/Consumer-2] Queue._start_thread()
[DEBUG/Consumer-2] Queue._start_thread()
[DEBUG/Consumer-2] doing self._thread.start()
[DEBUG/Consumer-2] doing self._thread.start()
[DEBUG/Consumer-2] starting thread to feed data to pipe
[DEBUG/Consumer-2] starting thread to feed data to pipe
[DEBUG/Consumer-2] ... done self._thread.start()
[DEBUG/Consumer-2] ... done self._thread.start()
Consumer-2: 1 * 1
Consumer-2: 2 * 2
Consumer-2: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
[INFO/Consumer-2] process shutting down
[INFO/Consumer-2] process shutting down
[DEBUG/Consumer-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-2] telling queue thread to quit
[DEBUG/Consumer-2] telling queue thread to quit
[INFO/MainProcess] process shutting down
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] telling queue thread to quit
[INFO/MainProcess] calling join() for process Consumer-2
[DEBUG/Consumer-2] running the remaining "atexit" finalizers
[INFO/MainProcess] calling join() for process Consumer-2
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] running the remaining "atexit" finalizers
[DEBUG/Consumer-2] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] joining queue thread
[DEBUG/Consumer-2] joining queue thread
[DEBUG/Consumer-2] ... queue thread joined
[DEBUG/Consumer-2] ... queue thread joined
[INFO/Consumer-2] process exiting with exitcode 0
[INFO/Consumer-2] process exiting with exitcode 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] ... queue thread joined
[DEBUG/MainProcess] ... queue thread joined

我是否必须以某种方式初始化多处理环境,或者只是不可能在主进程的循环中执行此操作?我正在使用 Ubuntu 12.04 和 Python 2.7.5

最佳答案

尝试将此行添加到 calc() 的末尾:

for w in consumers:
w.join()

在您的可加入队列上调用 join() 会阻塞,直到队列中的所有内容都被消耗掉,但这并不能保证子进程已被垃圾回收。我怀疑你的子进程中有一些对象在内存中徘徊,因为它们没有被加入。

关于函数中的 Python 多处理 - 每次函数调用都会增加内存使用量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18570067/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com