gpt4 book ai didi

python - 如何从 multiprocessing.queue 中的 Process 释放内存?

转载 作者:太空宇宙 更新时间:2023-11-03 17:39:37 27 4
gpt4 key购买 nike

我有一个程序试图预测我一周内发送的每封电子邮件(因此,通常发送 7 封)的电子邮件转换。输出是 7 个不同的文件,其中包含每个客户的预测分数。串行运行这些可能需要接近 8 个小时,因此我尝试使用 multiprocessing 并行化它们。这很好地加快了速度,但我注意到,一个进程完成后,它似乎会保留其内存,直到没有剩余内存,并且其中一个进程在没有完成其任务的情况下被系统杀死。

我基于 the 'manual pool' example in this answer 编写了以下代码,因为由于内存限制,我需要限制立即启动的进程数量。我想要的是,当一个进程完成时,它会将其内存释放给系统,为下一个工作进程释放空间。

下面是处理并发的代码:

def work_controller(in_queue, out_list):
while True:
key = in_queue.get()
print key

if key == None:
return

work_loop(key)
out_list.append(key)

if __name__ == '__main__':

num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
processes = []

for i in xrange(num_workers):
p = Process(target=work_controller, args=(work,results))
processes.append(p)
p.start()

iters = itertools.chain([key for key in training_dict.keys()])
for item in iters:
work.put(item)

for p in processes:
print "Joining Worker"
p.join()

这是实际的工作代码,如果有任何帮助的话:

def work_loop(key):
with open('email_training_dict.pkl','rb') as f:
training_dict = pickle.load(f)
df_test = pd.DataFrame.from_csv(test_file)
outdict = {}
target = 'is_convert'

df_train = train_dataframe(key)
features = data_cleanse(df_train,df_test)

# MAIN PREDICTION
print 'Start time: {}'.format(datetime.datetime.now()) + '\n'

# train/test by mailer
X_train = df_train[features]
X_test = df_test[features]
y_train = df_train[target]

# run model fit
clf = imbalance.ImbalanceClassifier()

clf = clf.fit(X_train, y_train)
y_hat = clf.predict(X_test)

outdict[key] = clf.y_vote
print outdict[key]
print 'Time Complete: {}'.format(datetime.datetime.now()) + '\n'
with open(output_file,'wb') as f:
pickle.dump(outdict,f)

最佳答案

我假设,就像您链接的示例一样,您正在使用 Queue.Queue() 作为队列对象。这是一个阻塞队列,这意味着对 queue.get() 的调用将返回一个元素,或者等待/阻塞直到它可以返回一个元素。尝试将您的 work_controller 函数更改为以下内容:

def work_controller(in_queue, out_list):
while True: # when the queue is empty return
try:
key = in_queue.get(False) # add False to not have the queue block
except Queue.Empty:
return
print key

work_loop(key)
out_list.append(key)

虽然上述解决了阻塞问题,但它又引起了另一个问题。在线程生命周期开始时,in_queue 中没有项目,因此线程将立即结束。

为了解决这个问题,我建议您添加一个标志来指示是否可以终止。

global ok_to_end # put this flag in a global space

def work_controller(in_queue, out_list):
while True: # when the queue is empty return
try:
key = in_queue.get(False) # add False to not have the queue block
except Queue.Empty:
if ok_to_end: # consult the flag before ending.
return
print key

work_loop(key)
out_list.append(key)

if __name__ == '__main__':

num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
processes = []

ok_to_end = False # termination flag
for i in xrange(num_workers):
p = Process(target=work_controller, args=(work,results))
processes.append(p)
p.start()

iters = itertools.chain([key for key in training_dict.keys()])
for item in iters:
work.put(item)

ok_to_end = True # termination flag set to True after queue is filled

for p in processes:
print "Joining Worker"
p.join()

关于python - 如何从 multiprocessing.queue 中的 Process 释放内存?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30767630/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com