- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
multiprocessing.Pool
快把我逼疯了...
我想升级许多软件包,并且对于每个软件包,我都必须检查是否有更高版本。这是由 check_one
函数完成的。
主要代码在 Updater.update
方法中:在那里我创建了 Pool 对象并调用 map()
方法。
代码如下:
def check_one(args):
res, total, package, version = args
i = res.qsize()
logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
i / float(total), package, i, total, addn=False)
try:
json = PyPIJson(package).retrieve()
new_version = Version(json['info']['version'])
except Exception as e:
logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
return
if new_version > version:
res.put_nowait((package, version, new_version, json))
class Updater(FileManager):
# __init__ and other methods...
def update(self):
logger.info('Searching for updates')
packages = Queue.Queue()
data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
for dist in self.working_set)
pool = multiprocessing.Pool()
pool.map(check_one, data)
pool.close()
pool.join()
while True:
try:
package, version, new_version, json = packages.get_nowait()
except Queue.Empty:
break
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
new_version,
version)
u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
if u:
self.upgrade(package, json, new_version)
else:
logger.info('{0} has not been upgraded', package)
self._clean()
logger.success('Updating finished successfully')
当我运行它时,我得到了这个奇怪的错误:
Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
最佳答案
multiprocessing 通过 mp.SimpleQueue
将任务(包括 check_one
和 data
)传递给工作进程。与 Queue.Queue
不同,放在 mp.SimpleQueue
中的所有内容都必须是可选择的。 Queue.Queue
s 是不可挑选的:
import multiprocessing as mp
import Queue
def foo(queue):
pass
pool=mp.Pool()
q=Queue.Queue()
pool.map(foo,(q,))
产生此异常:
UnpickleableError: Cannot pickle <type 'thread.lock'> objects
您的 data
包括 packages
,这是一个 Queue.Queue。这可能是问题的根源。
这是一个可能的解决方法:Queue
用于两个目的:
qsize
)我们可以使用 mp.Value
,而不是调用 qsize
,以便在多个进程之间共享一个值。
我们可以(并且应该)只返回来自对 check_one
的调用的值,而不是将结果存储在队列中。 pool.map
将结果收集到自己制作的队列中,并将结果作为 pool.map
的返回值返回。
例如:
import multiprocessing as mp
import Queue
import random
import logging
# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)
qsize = mp.Value('i', 1)
def check_one(args):
total, package, version = args
i = qsize.value
logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
i / float(total), package, i, total))
new_version = random.randrange(0,100)
qsize.value += 1
if new_version > version:
return (package, version, new_version, None)
else:
return None
def update():
logger.info('Searching for updates')
set_len=10
data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
for i in range(set_len) )
pool = mp.Pool()
results = pool.map(check_one, data)
pool.close()
pool.join()
for result in results:
if result is None: continue
package, version, new_version, json = result
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
package, new_version, version)
logger.info(txt)
logger.info('Updating finished successfully')
if __name__=='__main__':
logging.basicConfig(level=logging.DEBUG)
update()
关于python - multiprocessing.Pool - PicklingError : Can't pickle <type 'thread.lock' >: attribute lookup thread. 锁定失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7865430/
我是 CAN 协议(protocol)的新手,正在阅读 Robert Bosch 的 CAN 规范 ver2.0 B 部分。我无法理解第 63 页上的以下几行 ”注意:启动/唤醒:如果在启动期间只有一
我用 C 写了一些代码来读取 CAN 总线数据。当我读取 11 位 CAN ID 时一切正常。一旦我尝试读取 29 位 ID,它就会错误地显示 ID。 示例: 接收29位ID的消息: 0x01F0A0
如果这看起来与另一个问题相似或者看起来已经得到回答,我提前道歉。我觉得它非常详细,足以证明自己的问题。 我正在尝试寻找一个虚拟的 CAN 总线模拟器(或一些可以轻松制作模拟器的方法),它只会生成 CA
我的问题涉及 GNU 的品牌。 如果您有一系列命令可用作多个目标的配方,则 canned recipe派上用场了。我可能看起来像这样: define run-foo # Here comes a #
您好,我是一名学习canopen的学生。Canopen中的COB-ID和CAN标识符有什么关系?我在CIA主页上看到COB-ID不是CAN ID,但我不明白。 例如,如果 PDO 通过 CAN 总线传
我知道一个显性确认位是由另一个节点传输的消息的接收器发送的。 我无法理解的是,接收方是在接收到整个消息后发送单个显性位,还是接收者发送相同的消息,其中 ACK 位字段为显性? 或者是接收器在发送器传输
我是 CAN 协议(protocol)的新手,我正在尝试通过 Linux 的 SocketCAN 使用它。然而,我对可用的 2 种不同的 CAN 套接字(RAW 和广播管理器 (BCM))感到困惑。
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
我正在尝试制作一个在 Windows 下运行并与 ELM327 设备通信的软件。我创建了第一个版本,然后我进入了我的 SMART ForTwo (SMART 451) 车辆,我设法连接了仪表盘(发送
我知道在 CAN Controller 中,如果错误计数达到某个阈值(比如 255),就会发生总线关闭,这意味着特定的 CAN 节点将从 CAN 网络中关闭。所以根本不会有任何交流。但是,如果上述情况
我正在使用 ELM327,我希望能够设置要发送的 CAN 消息的 header 和数据部分。我看到有一个代码用于设置消息的标题 SH xxyyzz 但是我很难找出如何设置数据部分并控制何时发送消息。
我想做的是: 将数据插入具有两列的表中,并在同一 PHP 页面中显示更新的值。我能够获取数据并显示它,但无法插入任何数据。请指导我。 文件名为 mypage.php 到目前为止我的代码:
(这个问题是关于 Android 11 的) 我想将崩溃日志打印到其他应用程序可以读取的文件中(具体来说,我希望能够导航到该文件并使用"file"应用程序查看数据)。 我看过很多关于这个问题的答案,但
这会产生“ fatal error :无法解开Optional.None”,我似乎不明白为什么 var motionManager = CMMotionManager() motionManager.
在 Java 中,我经常遇到带有后缀 -able 的接口(interface),例如可序列化、可迭代等。这表明实现这些接口(interface)的对象具有可以对其执行某些操作的特性,例如该对象可以被序
我正在阅读 CanJS API 文档并遇到 can.Construct.extend http://canjs.com/docs/can.Construct.extend.html .我知道 can.
我正在使用 C 语言在 STM32F1xx 上进行开发,直到现在我都在尝试使用“CANopenNode-master”实现 CANopen 堆栈,并且我正在使用 2 个中断。 第一个是用于处理 SYN
我一直在使用 SocketCAN,尤其是 Virtual CAN vcan。但是,到目前为止,我从未使用过 CAN FD(灵活数据速率)。 好吧,我今天早上用 can-utils 试了一下: cans
我正在运行一个带有两个 CAN channel 的程序(使用 TowerTech CAN Cape TT3201)。 两个 channel 是 can0 (500k) 和 can1 (125k)。 c
存储由序列字符组成的字符串的 %s 格式说明符可以存储整数序列吗?如果是的话..你能解释一下吗? 最佳答案 无论如何,数字都是用字符表示的,所以是的,您可以使用 "%s" 说明符读取数字并将其存储在
我是一名优秀的程序员,十分优秀!