gpt4 book ai didi

python - python中的线程——同时处理多个大文件

转载 作者:太空宇宙 更新时间:2023-11-03 13:16:14 24 4
gpt4 key购买 nike

我是 Python 的新手,我无法理解线程的工作原理。通过浏览文档,我的理解是在线程上调用 join() 是阻塞直到它完成的推荐方法。

为了提供一点背景知识,我尝试解析 48 个大型 csv 文件(多个 GB)以找出不一致之处。线程不共享任何状态。这可以在合理的时间内以单线程方式一次性完成,但我正在尝试同时进行练习。

这是文件处理的框架:

def process_file(data_file):
with open(data_file) as f:
print "Start processing {0}".format(data_file)
line = f.readline()
while line:
# logic omitted for brevity; can post if required
# pretty certain it works as expected, single 'thread' works fine
line = f.readline()

print "Finished processing file {0} with {1} errors".format(data_file, error_count)

def process_file_callable(data_file):
try:
process_file(data_file)
except:
print >> sys.stderr, "Error processing file {0}".format(data_file)

并发位:

def partition_list(l, n):
""" Yield successive n-sized partitions from a list.
"""
for i in xrange(0, len(l), n):
yield l[i:i+n]

partitions = list(partition_list(data_files, 4))
for partition in partitions:
threads = []
for data_file in partition:
print "Processing file {0}".format(data_file)
t = Thread(name=data_file, target=process_file_callable, args = (data_file,))
threads.append(t)
t.start()

for t in threads:
print "Joining {0}".format(t.getName())
t.join(5)

print "Joined the first chunk of {0}".format(map(lambda t: t.getName(), threads))

我将其运行为:

python -u datautils/cleaner.py > cleaner.out 2> cleaner.err

我的理解是 join() 应该阻止调用线程等待调用它的线程完成,但是我观察到的行为与我的预期不一致。

我从未在错误文件中看到错误,但我也从未在标准输出上看到预期的日志消息。

父进程不会终止,除非我从 shell 中明确终止它。如果我检查我为 Finished ... 打印了多少,它永远不会是预期的 48,而是在 12 到 15 之间。但是,单线程运行后,我可以确认多线程运行是实际上处理所有事情并进行所有预期的验证,只是它似乎没有干净地终止。

我知道我一定做错了什么,但如果您能指出正确的方向,我将不胜感激。

最佳答案

我不明白你的代码哪里出错了。但我可以建议你稍微重构一下。首先,python 中的线程根本不是并发的。这只是幻觉,因为有一个 Global Interpreter Lock ,所以同一时间只能执行一个线程。这就是为什么我建议您使用 multiprocessing module :

from multiprocessing import Pool, cpu_count
pool = Pool(cpu_count)
for partition in partition_list(data_files, 4):
res = pool.map(process_file_callable, partition)
print res

其次,您使用的不是 pythonic 方式来读取文件:

with open(...) as f:
line = f.readline()
while line:
... # do(line)
line = f.readline()

这里是pythonic方式:

with open(...) as f:
for line in f:
... # do(line)

This is memory efficient, fast, and leads to simple code. (c) PyDoc

顺便说一下,我只有一个假设,你的程序在多线程方式下会发生什么——应用程序变得更慢,因为对硬盘驱动器的无序访问比有序访问要慢得多。如果您使用的是 Linux,您可以尝试使用 iostathtop 来验证这个假设。

如果您的应用没有完成工作,并且它在进程监视器中没有执行任何操作(CPU 或磁盘未激活),则意味着您遇到了某种死锁或对同一资源的访问受阻。

关于python - python中的线程——同时处理多个大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29291270/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com