gpt4 book ai didi

python - 多处理和 dill 可以一起做什么?

转载 作者:IT老高 更新时间:2023-10-28 21:33:39 26 4
gpt4 key购买 nike

我想在 Python 中使用 multiprocessing 库。遗憾的是,multiprocessing 使用了 pickle,它不支持带有闭包的函数、lambdas 或 __main__ 中的函数。这三个对我来说都很重要

In [1]: import pickle

In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>

幸好有dill更健壮的 pickle 。显然 dill 在导入时执行魔术以使 pickle 工作

In [3]: import dill

In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...

这非常令人鼓舞,特别是因为我无法访问多处理源代码。可悲的是,我仍然无法让这个非常基本的示例工作

import multiprocessing as mp
import dill

p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))

这是为什么?我错过了什么? multiprocessing+dill组合到底有什么限制?

J.F Sebastian 的临时编辑

mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py 
Temporary Edit for J.F Sebastian

mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

^C
...lots of junk...

[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

^C
...lots of junk...

[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()

最佳答案

multiprocessing 对 pickle 做出了一些错误的选择。不要误会我的意思,它做出了一些不错的选择,使其能够 pickle 某些类型,以便它们可以在池的 map 功能中使用。然而,由于我们有 dill 可以进行 pickle ,多处理自己的 pickle 变得有点限制。实际上,如果 multiprocessing 要使用 pickle 而不是 cPickle... 并且还放弃一些它自己的 pickle 覆盖,那么 dill 可以接管并为 multiprocessing 提供更完整的序列化。

在此之前,有一个名为 pathosmultiprocessing 分支(不幸的是,发布版本有点陈旧)消除了上述限制。 Pathos 还添加了多处理没有的一些不错的功能,例如 map 函数中的多参数。 Pathos 即将发布,经过一些温和的更新——主要是转换为 python 3.x。

Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

只是为了展示一下 pathos.multiprocessing 可以做什么...

>>> def busy_add(x,y, delay=0.01):
... for n in range(x):
... x += n
... for n in range(y):
... y -= n
... import time
... time.sleep(delay)
... return x + y
...
>>> def busy_squared(x):
... import time, random
... time.sleep(2*random.random())
... return x*x
...
>>> def squared(x):
... return x*x
...
>>> def quad_factory(a=1, b=1, c=0):
... def quad(x):
... return a*x**2 + b*x + c
... return quad
...
>>> square_plus_one = quad_factory(2,0,1)
>>>
>>> def test1(pool):
... print pool
... print "x: %s\n" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(squared, x)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
... print pool.imap.__name__
... start = time.time()
... res = pool.imap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = list(res)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
... print pool.amap.__name__
... start = time.time()
... res = pool.amap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = res.get()
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
...
>>> def test2(pool, items=4, delay=0):
... _x = range(-items/2,items/2,2)
... _y = range(len(_x))
... _d = [delay]*len(_x)
... print map
... res1 = map(busy_squared, _x)
... res2 = map(busy_add, _x, _y, _d)
... print pool.map
... _res1 = pool.map(busy_squared, _x)
... _res2 = pool.map(busy_add, _x, _y, _d)
... assert _res1 == res1
... assert _res2 == res2
... print pool.imap
... _res1 = pool.imap(busy_squared, _x)
... _res2 = pool.imap(busy_add, _x, _y, _d)
... assert list(_res1) == res1
... assert list(_res2) == res2
... print pool.amap
... _res1 = pool.amap(busy_squared, _x)
... _res2 = pool.amap(busy_add, _x, _y, _d)
... assert _res1.get() == res1
... assert _res2.get() == res2
... print ""
...
>>> def test3(pool): # test against a function that should fail in pickle
... print pool
... print "x: %s\n" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(square_plus_one, x)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
...
>>> def test4(pool, maxtries, delay):
... print pool
... m = pool.amap(busy_add, x, x)
... tries = 0
... while not m.ready():
... time.sleep(delay)
... tries += 1
... print "TRY: %s" % tries
... if tries >= maxtries:
... print "TIMEOUT"
... break
... print m.get()
...
>>> import time
>>> x = range(18)
>>> delay = 0.01
>>> items = 20
>>> maxtries = 20
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> pool = Pool(nodes=4)
>>> test1(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]

map
time to results: 0.0553691387177
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]

imap
time to queue: 7.91549682617e-05
time to results: 0.102381229401
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]

amap
time to queue: 7.08103179932e-05
time to results: 0.0489699840546
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]

>>> test2(pool, items, delay)
<built-in function map>
<bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>>

>>> test3(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]

map
time to results: 0.0523059368134
y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579]

>>> test4(pool, maxtries, delay)
<pool ProcessingPool(ncpus=4)>
TRY: 1
TRY: 2
TRY: 3
TRY: 4
TRY: 5
TRY: 6
TRY: 7
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]

关于python - 多处理和 dill 可以一起做什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19984152/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com