- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试实现 multiprocessing读取和比较两个 csv 文件的方法。为了让我开始,我从 embarassingly parallel problems 的代码示例开始。 ,它对文件中的整数求和。问题是这个例子不会为我运行。 (我在 Windows 上运行 Python 2.6。)
我收到以下 EOF 错误:
File "C:\Python26\lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError
在这一行:
self.pin.start()
我找到了一些 examples这表明问题可能是 csv 打开方法需要是“rb”。我试过了,但也没用。
然后我尝试简化代码以在最基本的层面上重现错误。我在同一条线上遇到了同样的错误。即使我简化了 parse_input_csv 函数甚至不读取文件。 (不确定如果文件未被读取,EOF 是如何触发的?)
import csv
import multiprocessing
class CSVWorker(object):
def __init__(self, infile, outfile):
#self.infile = open(infile)
self.infile = open(infile, 'rb') #try rb for Windows
self.in_csvfile = csv.reader(self.infile)
self.inq = multiprocessing.Queue()
self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
self.pin.start()
self.pin.join()
self.infile.close()
def parse_input_csv(self):
# for i, row in enumerate(self.in_csvfile):
# self.inq.put( (i, row) )
# for row in self.in_csvfile:
# print row
# #self.inq.put( row )
print 'yup'
if __name__ == '__main__':
c = CSVWorker('random_ints.csv', 'random_ints_sums.csv')
print 'done'
最后,我尝试将其全部拉到一个类之外。如果我不遍历 csv,这会起作用,但如果我这样做,则会出现相同的错误。
def manualCSVworker(infile, outfile):
f = open(infile, 'rb')
in_csvfile = csv.reader(f)
inq = multiprocessing.Queue()
# this works (no reading csv file)
pin = multiprocessing.Process(target=manual_parse_input_csv, args=(in_csvfile,))
# this does not work (tries to read csv, and fails with EOFError)
#pin = multiprocessing.Process(target=print_yup, args=())
pin.start()
pin.join()
f.close()
def print_yup():
print 'yup'
def manual_parse_input_csv(csvReader):
for row in csvReader:
print row
if __name__ == '__main__':
manualCSVworker('random_ints.csv', 'random_ints_sums.csv')
print 'done'
有人可以帮我找出这里的问题吗?
编辑:只是想我会发布工作代码。我最终放弃了 Class 实现。正如 Tim Peters 所建议的那样,我只传递文件名(而不是打开的文件)。
在 500 万行 x 2 列上,我注意到使用 2 个处理器与使用 1 个处理器相比,时间缩短了大约 20%。我预计会多一些,但我认为问题在于排队的额外开销。根据 this thread , 改进可能是以 100 个或更多的 block (而不是每行)为单位对记录进行排队。
import csv
import multiprocessing
from datetime import datetime
NUM_PROCS = multiprocessing.cpu_count()
def main(numprocsrequested, infile, outfile):
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
numprocs = min(numprocsrequested, NUM_PROCS)
pin = multiprocessing.Process(target=parse_input_csv, args=(infile,numprocs,inq,))
pout = multiprocessing.Process(target=write_output_csv, args=(outfile,numprocs,outq,))
ps = [ multiprocessing.Process(target=sum_row, args=(inq,outq,)) for i in range(numprocs)]
pin.start()
pout.start()
for p in ps:
p.start()
pin.join()
i = 0
for p in ps:
p.join()
#print "Done", i
i += 1
pout.join()
def parse_input_csv(infile, numprocs, inq):
"""Parses the input CSV and yields tuples with the index of the row
as the first element, and the integers of the row as the second
element.
The index is zero-index based.
The data is then sent over inqueue for the workers to do their
thing. At the end the input thread sends a 'STOP' message for each
worker.
"""
f = open(infile, 'rb')
in_csvfile = csv.reader(f)
for i, row in enumerate(in_csvfile):
row = [ int(entry) for entry in row ]
inq.put( (i,row) )
for i in range(numprocs):
inq.put("STOP")
f.close()
def sum_row(inq, outq):
"""
Workers. Consume inq and produce answers on outq
"""
tot = 0
for i, row in iter(inq.get, "STOP"):
outq.put( (i, sum(row)) )
outq.put("STOP")
def write_output_csv(outfile, numprocs, outq):
"""
Open outgoing csv file then start reading outq for answers
Since I chose to make sure output was synchronized to the input there
is some extra goodies to do that.
Obviously your input has the original row number so this is not
required.
"""
cur = 0
stop = 0
buffer = {}
# For some reason csv.writer works badly across threads so open/close
# and use it all in the same thread or else you'll have the last
# several rows missing
f = open(outfile, 'wb')
out_csvfile = csv.writer(f)
#Keep running until we see numprocs STOP messages
for works in range(numprocs):
for i, val in iter(outq.get, "STOP"):
# verify rows are in order, if not save in buffer
if i != cur:
buffer[i] = val
else:
#if yes are write it out and make sure no waiting rows exist
out_csvfile.writerow( [i, val] )
cur += 1
while cur in buffer:
out_csvfile.writerow([ cur, buffer[cur] ])
del buffer[cur]
cur += 1
f.close()
if __name__ == '__main__':
startTime = datetime.now()
main(4, 'random_ints.csv', 'random_ints_sums.csv')
print 'done'
print(datetime.now()-startTime)
最佳答案
跨进程传递对象需要在发送端“pickling”它(创建对象的字符串表示)并在接收端“unpickling”它(从字符串表示重新创建同构对象)。除非您确切地知道自己在做什么,否则您应该坚持传递内置 Python 类型(字符串、整数、 float 、列表、字典等)或由 multiprocessing
实现的类型( Lock()
, Queue()
,...)。否则 pickle-unpickle 舞蹈很可能不会奏效。
传递一个打开的文件永远不会奏效,更不用说一个包含在另一个对象中的打开文件(例如由 csv.reader(f)
返回)。当我运行你的代码时,我收到了来自 pickle
的错误消息:
pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader
不是吗?永远不要忽略错误 - 除非您再次确切地知道自己在做什么。
解决方案很简单:正如我在评论中所说,在 工作进程中打开文件,只需传递其字符串路径即可。例如,改用它:
def manual_parse_input_csv(csvfile):
f = open(csvfile,'rb')
in_csvfile = csv.reader(f)
for row in in_csvfile:
print row
f.close()
并从 manualCSVworker
中 取出所有代码,并将流程创建行更改为:
pin = multiprocessing.Process(target=manual_parse_input_csv, args=(infile,))
看到了吗?这会传递文件路径,一个纯字符串。行得通:-)
关于csv 文件上的 Python 多处理 EOF 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19613370/
我已经使用 vue-cli 两个星期了,直到今天一切正常。我在本地建立这个项目。 https://drive.google.com/open?id=0BwGw1zyyKjW7S3RYWXRaX24tQ
您好,我正在尝试使用 python 库 pytesseract 从图像中提取文本。请找到代码: from PIL import Image from pytesseract import image_
我的错误 /usr/bin/ld: errno: TLS definition in /lib/libc.so.6 section .tbss mismatches non-TLS reference
我已经训练了一个模型,我正在尝试使用 predict函数但它返回以下错误。 Error in contrasts<-(*tmp*, value = contr.funs[1 + isOF[nn]])
根据Microsoft DataConnectors的信息我想通过 this ODBC driver 创建一个从 PowerBi 到 PostgreSQL 的连接器使用直接查询。我重用了 Micros
我已经为 SoundManagement 创建了一个包,其中有一个扩展 MediaPlayer 的类。我希望全局控制这个变量。这是我的代码: package soundmanagement; impo
我在Heroku上部署了一个应用程序。我正在使用免费服务。 我经常收到以下错误消息。 PG::Error: ERROR: out of memory 如果刷新浏览器,就可以了。但是随后,它又随机发生
我正在运行 LAMP 服务器,这个 .htaccess 给我一个 500 错误。其作用是过滤关键字并重定向到相应的域名。 Options +FollowSymLinks RewriteEngine
我有两个驱动器 A 和 B。使用 python 脚本,我在“A”驱动器中创建一些文件,并运行 powerscript,该脚本以 1 秒的间隔将驱动器 A 中的所有文件复制到驱动器 B。 我在 powe
下面的函数一直返回这个错误信息。我认为可能是 double_precision 字段类型导致了这种情况,我尝试使用 CAST,但要么不是这样,要么我没有做对...帮助? 这是错误: ERROR: i
这个问题已经有答案了: Syntax error due to using a reserved word as a table or column name in MySQL (1 个回答) 已关闭
我的数据库有这个小问题。 我创建了一个表“articoli”,其中包含商品的品牌、型号和价格。 每篇文章都由一个 id (ID_ARTICOLO)` 定义,它是一个自动递增字段。 好吧,现在当我尝试插
我是新来的。我目前正在 DeVry 在线学习中级 C++ 编程。我们正在使用 C++ Primer Plus 这本书,到目前为止我一直做得很好。我的老师最近向我们扔了一个曲线球。我目前的任务是这样的:
这个问题在这里已经有了答案: What is an undefined reference/unresolved external symbol error and how do I fix it?
我的网站中有一段代码有问题;此错误仅发生在 Internet Explorer 7 中。 我没有在这里发布我所有的 HTML/CSS 标记,而是发布了网站的一个版本 here . 如您所见,我在列中有
如果尝试在 USB 设备上构建 node.js 应用程序时在我的树莓派上使用 npm 时遇到一些问题。 package.json 看起来像这样: { "name" : "node-todo",
在 Python 中,您有 None单例,在某些情况下表现得很奇怪: >>> a = None >>> type(a) >>> isinstance(a,None) Traceback (most
这是我的 build.gradle (Module:app) 文件: apply plugin: 'com.android.application' android { compileSdkV
我是 android 的新手,我的项目刚才编译和运行正常,但在我尝试实现抽屉导航后,它给了我这个错误 FAILURE: Build failed with an exception. What wen
谁能解释一下?我想我正在做一些非常愚蠢的事情,并且急切地等待着启蒙。 我得到这个输出: phpversion() == 7.2.25-1+0~20191128.32+debian8~1.gbp108
我是一名优秀的程序员,十分优秀!