gpt4 book ai didi

python多处理队列错误

转载 作者:太空宇宙 更新时间:2023-11-04 10:31:48 25 4
gpt4 key购买 nike

我有这个 python 代码来读取文件,进行一些处理并并行写入结果:

def line_chunker(path):
"""
Reads a file in chunks and yields each chunk.
Each chunk is guaranteed to end at a carriage return (EOL).
Each chunk is returned a single string.

The number of chunks the file is split into is equal to the number of CPU cores
available
"""
size = os.path.getsize(path)
cores = mp.cpu_count()
chunksize = size/cores # gives truncated integer

f = open(path)
s = f.readline() # skip header
while True:
part = f.readlines(chunksize)
if not part:
f.close()
break
else:
yield "".join(part)
f.close()

def _validate(chunk, outq):
""" Performs format validation on a given chunk of a csv file """
rows = csv.reader(StringIO(chunk))
vld = validation.RowValidator(rows)
vld.check_rows()
outq.put(vld.errors)

def _write(outq):
"""Writes lines in the outq to a text file """
outfile = open("C:/testoutput.txt", "w")
while True:
result = outq.get()
if result is None:
outfile.close()
break
else:
for line in result:
outfile.write(line)
outfile.write("\n")

def validate_pll(path):
""" Perform validation in parallel """

pool = mp.Pool()
outq = mp.Manager().Queue(maxsize = 8)

writer = mp.Process(target = _write, args = (outq,))
writer.start()
for chunk in line_chunker(path):
pool.apply_async(_validate, (chunk, outq))

pool.close()
pool.join()

它以 block 的形式读取文件,并为每个 block 启动一个进程来进行处理。处理的结果放在队列中,由另一个进程监视。

代码运行,但完成后我得到一个奇怪的 EOFError

我怀疑是因为我没有调用writer.join(),但是如果我添加这一行,就像这样:

def validate_pll(path):    
""" Perform validation in parallel """

pool = mp.Pool()
outq = mp.Manager().Queue(maxsize = 8)

writer = mp.Process(target = _write, args = (outq,))
writer.start()
for chunk in line_chunker(path):
pool.apply_async(_validate, (chunk, outq))

pool.close()
pool.join()
writer.join()

代码只是挂起。知道我做错了什么吗?

给出的错误信息是:

Process Process-10:
Traceback (most recent call last):
File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap
self.run()
File C\Anaconda\lib\multiprocessing\process.py line 114, in run
self._target(*self._args, **self._kwargs)
File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write
result = outq.get()
File "(string)", line 2, in get
File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod
kind, result = conn.recv()
EOFError

最佳答案

当主进程结束时,_writer 进程仍在等待将条目写入 outq。它通过打开与管理共享 QueueManager 进程的阻塞连接来等待条目。现在,在主进程完成执行时,Manager 进程关闭,它将 EOF 发送到 _writer 打开的连接,您会看到该异常。

要修复它,您需要告诉 _writer 在主进程结束之前关闭(并且通过扩展,Manager 进程关闭)。您实际上已经为此设置了一个机制,只是没有使用它;将 None 发送到 outq_writer 将有序关闭。在 writer.join() 之前调用它,事情应该会正常进行。

关于python多处理队列错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25994201/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com