”在 Windows 上使用多处理时出错-6ren"> ”在 Windows 上使用多处理时出错-我正在使用 Windows 编写一个多处理程序来并行处理大型 .CSV 文件。 我找到了 this excellent example对于类似的问题。在 Windows 下运行时,我收到一条错误消息,-6ren">
gpt4 book ai didi

python - "Can' t pickle ”在 Windows 上使用多处理时出错

转载 作者:太空狗 更新时间:2023-10-30 01:09:55 25 4
gpt4 key购买 nike

我正在使用 Windows 编写一个多处理程序来并行处理大型 .CSV 文件。

我找到了 this excellent example对于类似的问题。在 Windows 下运行时,我收到一条错误消息,指出 csv.reader 不可 Picklable。

我想我可以在读取器子进程中打开 CSV 文件,然后将文件名从父进程发送给它。但是,我想传递一个已打开的 CSV 文件(就像代码应该做的那样),具有特定的状态,即真正使用共享对象。

知道如何在 Windows 下执行此操作或那里缺少什么吗?

这是代码(为了便于阅读,我重新发布):

"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
"""Make the command line interface parser."""
usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
__doc__,
"""
ARGUMENTS:
INPUT_CSV: an input CSV file with rows of numbers
OUTPUT_CSV: an output file that will contain the sums\
"""])
cli_parser = optparse.OptionParser(usage)
cli_parser.add_option('-n', '--numprocs', type='int',
default=NUM_PROCS,
help="Number of processes to launch [DEFAULT: %default]")
return cli_parser

class CSVWorker(object):
def __init__(self, numprocs, infile, outfile):
self.numprocs = numprocs
self.infile = open(infile)
self.outfile = outfile
self.in_csvfile = csv.reader(self.infile)
self.inq = multiprocessing.Queue()
self.outq = multiprocessing.Queue()

self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
for i in range(self.numprocs)]

self.pin.start()
self.pout.start()
for p in self.ps:
p.start()

self.pin.join()
i = 0
for p in self.ps:
p.join()
print "Done", i
i += 1

self.pout.join()
self.infile.close()

def parse_input_csv(self):
"""Parses the input CSV and yields tuples with the index of the row
as the first element, and the integers of the row as the second
element.

The index is zero-index based.

The data is then sent over inqueue for the workers to do their
thing. At the end the input thread sends a 'STOP' message for each
worker.
"""
for i, row in enumerate(self.in_csvfile):
row = [ int(entry) for entry in row ]
self.inq.put( (i, row) )

for i in range(self.numprocs):
self.inq.put("STOP")

def sum_row(self):
"""
Workers. Consume inq and produce answers on outq
"""
tot = 0
for i, row in iter(self.inq.get, "STOP"):
self.outq.put( (i, sum(row)) )
self.outq.put("STOP")

def write_output_csv(self):
"""
Open outgoing csv file then start reading outq for answers
Since I chose to make sure output was synchronized to the input there
is some extra goodies to do that.

Obviously your input has the original row number so this is not
required.
"""
cur = 0
stop = 0
buffer = {}
# For some reason csv.writer works badly across threads so open/close
# and use it all in the same thread or else you'll have the last
# several rows missing
outfile = open(self.outfile, "w")
self.out_csvfile = csv.writer(outfile)

#Keep running until we see numprocs STOP messages
for works in range(self.numprocs):
for i, val in iter(self.outq.get, "STOP"):
# verify rows are in order, if not save in buffer
if i != cur:
buffer[i] = val
else:
#if yes are write it out and make sure no waiting rows exist
self.out_csvfile.writerow( [i, val] )
cur += 1
while cur in buffer:
self.out_csvfile.writerow([ cur, buffer[cur] ])
del buffer[cur]
cur += 1

outfile.close()

def main(argv):
cli_parser = make_cli_parser()
opts, args = cli_parser.parse_args(argv)
if len(args) != 2:
cli_parser.error("Please provide an input file and output file.")

c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
main(sys.argv[1:])

在 Windows 下运行时,这是我收到的错误:

Traceback (most recent call last):
File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 130, in <module>
main(sys.argv[1:])
File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 127, in main
c = CSVWorker(opts.numprocs, args[0], args[1])
File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 44, in __init__
self.pin.start()
File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
self._popen = Popen(self)
File "C:\Python27\lib\multiprocessing\forking.py", line 271, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "C:\Python27\lib\multiprocessing\forking.py", line 193, in dump
ForkingPickler(file, protocol).dump(obj)
File "C:\Python27\lib\pickle.py", line 224, in dump
self.save(obj)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\multiprocessing\forking.py", line 66, in dispatcher
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 401, in save_reduce
save(args)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 548, in save_tuple
save(element)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 396, in save_reduce
save(cls)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 753, in save_global
(obj, module, name))
pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python27\lib\multiprocessing\forking.py", line 374, in main
self = load(from_parent)
File "C:\Python27\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Python27\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Python27\lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError

最佳答案

您遇到的问题是由于使用 CSVWorker 类的方法作为进程目标引起的;那个类(class)有不能腌制的成员;那些打开的文件永远不会工作;

您要做的是将该类分成两个类;一个协调所有工作子进程,另一个实际执行计算工作。工作进程将文件名作为参数并根据需要打开单个文件,或者至少等到他们调用了工作方法并打开文件时才打开文件。它们还可以将 multiprocessing.Queue 作为参数或实例成员;这是安全的。

在某种程度上,你已经有点这样做了;您的 write_output_csv 方法正在子进程中打开文件,但是您的 parse_input_csv 方法期望找到一个已经打开和准备好的文件作为 self< 的属性。坚持以相反的方式进行,您应该会保持良好的状态。

关于python - "Can' t pickle <type '_csv.reader' >”在 Windows 上使用多处理时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8514238/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com