- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
Python 2.7.3
我有一个包含数千个数据文件的文件夹。每个数据文件都会被输入构造函数并进行大量处理。现在我正在迭代这些文件并按顺序处理它们:
class Foo:
def __init__(self,file):
self.bar = do_lots_of_stuff_with_numpy_and_scipy(file)
def do_lots_of_stuff_with_numpy_and_scipy(file):
pass
def get_foos(dir):
return [Foo(os.path.join(dir,file)) for file in os.listdir(dir)]
这工作得很漂亮,但是太慢了。我想并行地做这件事。我尝试过:
def parallel_get_foos(dir):
p = Pool()
foos = p.map(Foo, [os.path.join(dir,file) for file in os.listdir(dir)])
p.close()
p.join()
return foos
if __name__ == "__main__":
foos = parallel_get_foos(sys.argv[1])
但它只是因为很多这样的错误而出错:
Process PoolWorker-7:
Traceback (most recent call last):
File "/l/python2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/l/python2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/l/python2.7/lib/python2.7/multiprocessing/pool.py", line 99, in worker
put((job, i, result))
File "/l/python2.7/lib/python2.7/multiprocessing/queues.py", line 390, in put
return send(obj)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我尝试创建一个函数来返回对象,例如:
def get_foo(file):
return Foo(file)
def parallel_get_foos(dir):
...
foos = p.map(get_foo, [os.path.join(dir,file) for file in os.listdir(dir)])
...
但正如预期的那样,我得到了同样的错误。
我已经阅读了大量类似的线程,试图解决类似于此的问题,但没有一个解决方案对我有帮助。所以我很感谢任何帮助!
编辑:
Bakuriu 正确地推测我在 do_lots_of_stuff 方法中定义了一个非顶级函数。具体来说,我正在做以下事情:
def fit_curve(data,degree):
"""Fits a least-square polynomial function to the given data."""
sorted = data[data[:,0].argsort()].T
coefficients = numpy.polyfit(sorted[0],sorted[1],degree)
def eval(val,deg=degree):
res = 0
for coefficient in coefficients:
res += coefficient*val**deg
deg -= 1
return res
return eval
有什么办法可以让这个函数变得可 pickle 吗?
最佳答案
你正在做的事情(至少,你在例子中展示的),实际上工作得很好:
$mkdir TestPool
$cd TestPool/
$for i in {1..100}
> do
> touch "test$i"
> done
$ls
test1 test18 test27 test36 test45 test54 test63 test72 test81 test90
test10 test19 test28 test37 test46 test55 test64 test73 test82 test91
test100 test2 test29 test38 test47 test56 test65 test74 test83 test92
test11 test20 test3 test39 test48 test57 test66 test75 test84 test93
test12 test21 test30 test4 test49 test58 test67 test76 test85 test94
test13 test22 test31 test40 test5 test59 test68 test77 test86 test95
test14 test23 test32 test41 test50 test6 test69 test78 test87 test96
test15 test24 test33 test42 test51 test60 test7 test79 test88 test97
test16 test25 test34 test43 test52 test61 test70 test8 test89 test98
test17 test26 test35 test44 test53 test62 test71 test80 test9 test99
$vi test_pool_dir.py
$cat test_pool_dir.py
import os
import multiprocessing
class Foo(object):
def __init__(self, fname):
self.fname = fname #or your calculations
def parallel_get_foos(directory):
p = multiprocessing.Pool()
foos = p.map(Foo, [os.path.join(directory, fname) for fname in os.listdir(directory)])
p.close()
p.join()
return foos
if __name__ == '__main__':
foos = parallel_get_foos('.')
print len(foos) #expected 101: 100 files plus this script
$python test_pool_dir.py
101
版本信息:
$python --version
Python 2.7.3
$uname -a
Linux giacomo-Acer 3.2.0-39-generic #62-Ubuntu SMP Thu Feb 28 00:28:53 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
我的猜测是,您没有完全按照您在代码示例中显示的方式进行操作。例如,我在执行此操作时收到与您类似的错误:
>>> import pickle
>>> def test():
... def test2(): pass
... return test2
...
>>> import multiprocessing
>>> p = multiprocessing.Pool()
>>> p.map(test(), [1,2,3])
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
这是显而易见的:
>>> pickle.dumps(test())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 748, in save_global
(obj, module, name))
pickle.PicklingError: Can't pickle <function test2 at 0x7fad15fc2938>: it's not found as __main__.test2
和pickle
的文档指出:
The following types can be pickled:
None
,True
, andFalse
- integers, long integers, floating point numbers, complex numbers
- normal and Unicode strings
tuple
s,list
s,set
s, and dictionaries containing only picklable objects- functions defined at the top level of a module
- built-in functions defined at the top level of a module
- classes that are defined at the top level of a module
- instances of such classes whose
__dict__
or the result of calling__getstate__()
is picklable (see section The pickle protocol for details).
继续:
Note that functions (built-in and user-defined) are pickled by “fully qualified” name reference, not by value. This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised.
关于python - 与多处理池并行创建对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15760179/
有没有办法同时运行 2 个不同的代码块。我一直在研究 R 中的并行包,它们似乎都基于在循环中运行相同的函数。我正在寻找一种同时运行不同函数的方法(循环的 1 次迭代)。例如,我想在某个数据对象上创建一
无论如何增加 Parallel.For 启动后的循环次数?示例如下: var start = 0; var end = 5; Parallel.For(start, end, i => { C
我是 Golang 的新手,正在尝试了解并发和并行。我阅读了下面提到的关于并发和并行的文章。我执行了相同的程序。但没有得到相同的(混合字母和字符)输出。首先获取所有字母,然后获取字符。似乎并发不工作,
我正在寻找同时迭代 R 中两个或多个字符向量/列表的方法,例如。有没有办法做这样的事情: foo <- c('a','c','d') bar <- c('aa','cc','dd') for(i in
我对 Raku 很陌生,我对函数式方法有疑问,尤其是 reduce。 我最初有这样的方法: sub standardab{ my $mittel = mittel(@_); my $foo =
我最近花了很多时间来学习实时音频处理的细节,我发现的大多数库/工具都是c / c++代码或脚本/图形语言的形式,并在其中编译了c / c++代码。引擎盖。 使用基于回调的API,与GUI或App中的其
我正在使用 JMeter 进行图像负载测试。我有一个图像名称数组并遍历该数组,我通过 HTTP 请求获取所有图像。 -> loop_over_image - for loop controller
我整个晚上都在困惑这个问题...... makeflags = ['--prefix=/usr','--libdir=/usr/lib'] rootdir='/tmp/project' ps = se
我正在尝试提高计算图像平均值的方法的性能。 为此,我使用了两个 For 语句来迭代所有图像,因此我尝试使用一个 Parallel For 来改进它,但结果并不相同。 我做错了吗?或者是什么导致了差异?
假设您有一个并行 for 循环实现,例如ConcRT parallel_for,将所有工作放在一个 for 循环体内总是最好的吗? 举个例子: for(size_t i = 0; i < size()
我想并行运行一部分代码。目前我正在使用 Parallel.For 如何让10、20或40个线程同时运行 我当前的代码是: Parallel.For(1, total, (ii) =>
我使用 PAY API 进行了 PayPal 自适应并行支付,其中无论用户(买家)购买什么,都假设用户购买了总计 100 美元的商品。在我的自适应并行支付中,有 2 个接收方:Receiver1 和
我正在考虑让玩家加入游戏的高效算法。由于会有大量玩家,因此算法应该是异步的(即可扩展到集群中任意数量的机器)。有细节:想象有一个无向图(每个节点都是一个玩家)。玩家之间的每条边意味着玩家可以参加同一场
我有一个全局变量 volatile i = 0; 和两个线程。每个都执行以下操作: i++; System.out.print(i); 我收到以下组合。 12、21 和 22。 我理解为什么我没有得到
我有以下称为 pgain 的方法,它调用我试图并行化的方法 dist: /***************************************************************
我有一个 ruby 脚本读取一个巨大的表(约 2000 万行),进行一些处理并将其提供给 Solr 用于索引目的。这一直是我们流程中的一大瓶颈。我打算在这里加快速度,我想实现某种并行性。我对 Ru
我正在研究 Golang 并遇到一个问题,我已经研究了几天,我似乎无法理解 go routines 的概念以及它们的使用方式。 基本上我是在尝试生成数百万条随机记录。我有生成随机数据的函数,并将创建一
我希望 for 循环使用 go 例程并行。我尝试使用 channel ,但没有用。我的主要问题是,我想在继续之前等待所有迭代完成。这就是为什么在它不起作用之前简单地编写 go 的原因。我尝试使用 ch
我正在使用 import Control.Concurrent.ParallelIO.Global main = parallel_ (map processI [1..(sdNumber runPa
我正在尝试通过 makePSOCKcluster 连接到另一台计算机: library(parallel) cl ... doTryCatch -> recvData -> makeSOCKm
我是一名优秀的程序员,十分优秀!