gpt4 book ai didi

Python 多处理池没有创建足够的进程

转载 作者:太空狗 更新时间:2023-10-29 20:31:11 24 4
gpt4 key购买 nike

我正在对 40GB 的数据进行计算。每个文件都是一个包含 json 行的压缩 gzip 文件。每个文件最多有 500,000 行,或大约 500MB。我有一个运行 128 个 CPU 和 1952 GB 内存的亚马逊实例。我要做的是尽快处理每个文件。

我正在使用这样的多处理池:

def initializeLock(l):

global lock
lock = l

if __name__ == '__main__':
directory = '/home/ubuntu/[directory_containing_files]/*.gz'
file_names = glob.glob(directory)

lock = Lock()
pool = Pool(initializer=initializeLock, initargs=(lock,))
pool.map(do_analysis, file_names)
pool.close()
pool.join()

我希望发生的情况是创建大量进程,并且每个进程处理一个文件。实际发生的是最初创建了 100 多个进程。此时我使用了大约 85% 的内存,这太棒了!然后每一个都完成。最终运行的进程数量下降到大约 10 个。此时我只使用了 5% 的内存。定期启动其他进程,但它永远不会恢复到运行 100 个左右。所以我有这个大 CPU 和所有这些空闲内存,但大多数时候我最多运行 10 个进程。

知道如何让它继续运行 100 个进程直到所有文件都完成吗?

编辑:

我向应用程序添加了一些日志记录。最初它加载 127 个进程,我认为这是因为我有 128 个 CPU,并且在加载进程时有一个正在使用。一些过程成功完成,结果被保存。然后在某个时候,除了少数正在运行的进程外,所有进程都结束了。当我查看有多少文件已完成时,127 个中只有 22 个是完整的。然后它只使用 5-10 个进程运行,所有这些都成功完成。我在想它可能会耗尽内存并崩溃。但为什么?我有那么多内存和那么多 CPU。

编辑 2:

所以我找到了问题所在。问题是我在 do_analysis 方法中设置了一个锁,所有进程大约在同一时间完成并等待释放锁。进程没有停止,它们正在休眠。所以这让我想到了另一个问题:我的主要目标是获取每个包含许多 json 行的文件,从 json 行获取 ID 属性,然后将其附加到包含具有相同 id 的其他行的文件。如果该文件不存在,我会创建它。我所做的是在访问文件时设置一个锁,以避免它被另一个进程访问。这是我的代码。

for key, value in dataframe.iteritems():
if os.path.isfile(file_name):
lock.acquire()
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
lock.release()
else:
value.to_csv(filename), header=True, encoding='utf-8')

所以现在我正在尝试想出一种创造性的方法来附加到文件,但不阻止所有其他进程。我正在处理大量数据,需要同时访问两个文件的可能性很小,但它仍然会发生。所以我需要确保在附加文件时,另一个进程不会尝试打开该文件。

最佳答案

感谢大家的意见。这是我目前对这个问题的解决方案,我计划在接下来的一周内提高效率。我接受了 Martin 的建议,一旦文件全部完成,我就将它们粘合在一起,但是,我想努力实现 daphtdazz 解决方案,让一个进程在我生成更多文件的同时使用队列进行粘合。

def do_analyis(file):
# To keep the file names unique, I append the process id to the end
process_id = multiprocessing.current_process().pid

# doing analysis work...

for key, value in dataframe.iteritems():
if os.path.isfile(filename):
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
else:
value.to_csv(filename), header=True, encoding='utf-8')

def merge_files(base_file_name):
write_directory = 'write_directory'
all_files = glob.glob('{0}*'.format(base_file_name))

is_file_created = False

for file in all_files:
if is_file_created:
print 'File already exists, appending'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), mode='a', header=False, encoding='utf-8')
else:
print 'File does not exist, creating.'
dataframe = pandas.read_csv(file, index_col=0)
dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), header=True, encoding='utf-8')
is_file_created = True


if __name__ == '__main__':
# Run the code to do analysis and group files by the id in the json lines
directory = 'directory'
file_names = glob.glob(directory)
pool = Pool()
pool.imap_unordered(do_analysis, file_names, 1)
pool.close()
pool.join()

# Merge all of the files together
base_list = get_unique_base_file_names('file_directory')
pool = Pool()
pool.imap_unordered(merge_files, base_list, 100)
pool.close()
pool.join()

这会保存每个文件,并在文件末尾附加一个唯一的进程 ID,然后返回并通过 json 文件中的 ID 获取所有文件并将它们合并在一起。创建文件时,cpu 使用率在 60-70% 之间。那是体面的。合并文件时,cpu 使用率约为 8%。这是因为文件合并得如此之快,以至于我不需要我拥有的所有 CPU 处理能力。该解决方案有效。但它可能更有效率。我将努力同时完成这两项工作。欢迎任何建议。

关于Python 多处理池没有创建足够的进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40725622/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com