gpt4 book ai didi

python - multiprocessing.Pool - PicklingError : Can't pickle : attribute lookup thread. 锁定失败

转载 作者:IT老高 更新时间:2023-10-28 22:00:46 31 4
gpt4 key购买 nike

multiprocessing.Pool 快把我逼疯了...
我想升级许多软件包,并且对于每个软件包,我都必须检查是否有更高版本。这是由 check_one 函数完成的。
主要代码在 Updater.update 方法中:在那里我创建了 Pool 对象并调用 map() 方法。

代码如下:

def check_one(args):
res, total, package, version = args
i = res.qsize()
logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
i / float(total), package, i, total, addn=False)
try:
json = PyPIJson(package).retrieve()
new_version = Version(json['info']['version'])
except Exception as e:
logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
return
if new_version > version:
res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

# __init__ and other methods...

def update(self):
logger.info('Searching for updates')
packages = Queue.Queue()
data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
for dist in self.working_set)
pool = multiprocessing.Pool()
pool.map(check_one, data)
pool.close()
pool.join()
while True:
try:
package, version, new_version, json = packages.get_nowait()
except Queue.Empty:
break
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
new_version,
version)
u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
if u:
self.upgrade(package, json, new_version)
else:
logger.info('{0} has not been upgraded', package)
self._clean()
logger.success('Updating finished successfully')

当我运行它时,我得到了这个奇怪的错误:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

最佳答案

multiprocessing 通过 mp.SimpleQueue 将任务(包括 check_onedata)传递给工作进程。与 Queue.Queue 不同,放在 mp.SimpleQueue 中的所有内容都必须是可选择的。 Queue.Queues 是不可挑选的:

import multiprocessing as mp
import Queue

def foo(queue):
pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

产生此异常:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

您的 data 包括 packages,这是一个 Queue.Queue。这可能是问题的根源。


这是一个可能的解决方法:Queue 用于两个目的:

  1. 找出近似大小(通过调用qsize)
  2. 存储结果以供日后检索。

我们可以使用 mp.Value,而不是调用 qsize,以便在多个进程之间共享一个值。

我们可以(并且应该)只返回来自对 check_one 的调用的值,而不是将结果存储在队列中。 pool.map 将结果收集到自己制作的队列中,并将结果作为 pool.map 的返回值返回。

例如:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
total, package, version = args
i = qsize.value
logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
i / float(total), package, i, total))
new_version = random.randrange(0,100)
qsize.value += 1
if new_version > version:
return (package, version, new_version, None)
else:
return None

def update():
logger.info('Searching for updates')
set_len=10
data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
for i in range(set_len) )
pool = mp.Pool()
results = pool.map(check_one, data)
pool.close()
pool.join()
for result in results:
if result is None: continue
package, version, new_version, json = result
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
package, new_version, version)
logger.info(txt)
logger.info('Updating finished successfully')

if __name__=='__main__':
logging.basicConfig(level=logging.DEBUG)
update()

关于python - multiprocessing.Pool - PicklingError : Can't pickle <type 'thread.lock' >: attribute lookup thread. 锁定失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7865430/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com