- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是装饰器新手,对于第一个装饰器项目来说,这可能比我能咀嚼的要多,但我想做的是制作一个并行
装饰器,它接受一个看起来像的函数就像它谦虚地应用于单个参数,并自动使用多处理
分配它,并将其转换为应用于参数列表的函数。
我正在跟进 this very helpful answer对于之前的问题,这样我就可以成功地腌制类实例方法,并且我可以获得像答案这样的示例,可以很好地工作。
这是我第一次尝试并行装饰器(在咨询了一些关于线程装饰器的网络点击之后)。
###########
# Imports #
###########
import types, copy_reg, multiprocessing as mp
import pandas, numpy as np
### End Imports
##################
# Module methods #
##################
# Parallel decorator
def parallel(f):
def executor(*args):
_pool = mp.Pool(2)
_result = _pool.map_async(f, args[1:])
# I used args[1:] because the input will be a
# class instance method, so gotta skip over the self object.
# but it seems like there ought to be a better way...
_pool.close()
_pool.join()
return _result.get()
return executor
### End parallel
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
### End _pickle_method
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
### End _unpickle_method
# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
### End Module methods
##################
# Module classes #
##################
class Foo(object):
def __init__(self, args):
self.my_args = args
### End __init__
def squareArg(self, arg):
return arg**2
### End squareArg
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map_async(self.squareArg, self.my_args)
p.close()
p.join()
return q.get()
### End par_SquarArg
@parallel
def parSquare(self, num):
return self.squareArg(num)
### End parSquare
### End Foo
### End Module classes
###########
# Testing #
###########
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
### End Testing
但是当我使用这种方法时(愚蠢地尝试使用相同的 _pickle_method
和 _unpickle_method
来强化酸洗功能),我首先收到一个错误, AttributeError: 'function' 对象没有属性 'im_func'
但更常见的是,该错误表示函数无法被 pickle。
所以问题是双重的。 (1) 如何修 retrofit 饰器,以便如果它所采用的 f
对象是类的实例方法,那么它返回的 executor
也是类的实例方法那个类对象(这样就不会发生无法pickle的事情,因为我可以pickle那些实例方法)? (2) 创建额外的 _pickle_function
和 _unpickle_function
方法是否更好?我认为Python可以pickle模块级函数,所以如果我的代码没有导致executor
成为一个实例方法,那么它似乎应该是一个模块级函数,但是为什么不能是腌制的吗?
最佳答案
(1) How could I modify the decorator so that if the f object it takes is an instance method of a class, then the executor it returns is also an instance method of that class object (so that this business about not being able to pickle does not happen, since I can pickle those instance methods)?
>>> myfoo.parSquare
<bound method Foo.executor of <__main__.Foo object at 0x101332510>>
正如你所看到的 parSquare 实际上是执行器,它已经成为一个实例方法,这并不奇怪,因为装饰器是一种函数包装器......
How to make a chain of function decorators?可能对装饰器有最好的描述。
(2) Is it better to create addiitional _pickle_function and _unpickle_function methods?
你不需要 python 已经支持它们,事实上这个 copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
似乎有点奇怪,因为您使用相同的算法来腌制这两种类型。
现在更大的问题是为什么我们会得到 PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
错误本身似乎有些模糊,但看起来它无法查找某些东西,我们的函数?
我认为发生的事情是装饰器正在用您案例中内部定义的函数覆盖该函数 parSquare
变成executor
但是executor
是 parallel
的内部函数因此它不重要,因此查找似乎失败,这只是一种预感。
让我们尝试一个更简单的示例。
>>> def parallel(function):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
...
>>> @parallel
... def square(value):
... return value**2
...
>>>
>>> square([1,2,3,4])
Exception in thread Thread-1:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
与我们遇到的错误几乎相同。
请注意,上面的代码相当于:
def parallel(function):
def apply(values):
from multiprocessing import Pool
pool = Pool(4)
result = pool.map(function, values)
pool.close()
pool.join()
return result
return apply
def square(value):
return value**2
square = parallel(square)
这会产生相同的错误,另请注意,如果我们不重命名我们的函数。
>>> def parallel(function):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
...
>>> def _square(value):
... return value**2
...
>>> square = parallel(_square)
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>
它工作得很好,我一直在寻找一种方法来控制装饰器使用名称的方式,但无济于事,我仍然想将它们与多处理一起使用,所以我想出了一个有点丑陋的解决方法:
>>> def parallel(function):
... def temp(_):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
... return temp
...
>>> def _square(value):
... return value*value
...
>>> @parallel(_square)
... def square(values):
... pass
...
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>
所以基本上我将真正的函数传递给装饰器,然后我使用第二个函数来处理这些值,正如你所看到的,它工作得很好。
我稍微修改了您的初始代码以更好地处理装饰器,尽管它并不完美。
import types, copy_reg, multiprocessing as mp
def parallel(f):
def executor(*args):
_pool = mp.Pool(2)
func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
_result = _pool.map(func, args[1])
_pool.close()
_pool.join()
return _result
return executor
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
func = None
for cls in cls.mro():
if func_name in cls.__dict__:
func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor
break
else:
for attr in dir(cls):
prop = getattr(cls, attr)
if hasattr(prop, '__call__') and prop.__name__ == func_name:
func = cls.__dict__[attr]
break
if func == None:
raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class Foo(object):
def __init__(self, args):
self.my_args = args
def squareArg(self, arg):
return arg**2
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map(self.squareArg, self.my_args)
p.close()
p.join()
return q
@parallel
def parSquare(self, num):
return self.squareArg(num)
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
从根本上讲,这仍然失败,给我们 AssertionError: daemonic processes are not allowed to have children
由于子进程尝试调用该函数,请记住,子进程并不真正复制代码,只是简单地复制名称...
一种解决方法与我之前提到的类似:
import types, copy_reg, multiprocessing as mp
def parallel(f):
def temp(_):
def executor(*args):
_pool = mp.Pool(2)
func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
_result = _pool.map(func, args[1])
_pool.close()
_pool.join()
return _result
return executor
return temp
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
func = None
for cls in cls.mro():
if func_name in cls.__dict__:
func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor
break
else:
for attr in dir(cls):
prop = getattr(cls, attr)
if hasattr(prop, '__call__') and prop.__name__ == func_name:
func = cls.__dict__[attr]
break
if func == None:
raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class Foo(object):
def __init__(self, args):
self.my_args = args
def squareArg(self, arg):
return arg**2
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map(self.squareArg, self.my_args)
p.close()
p.join()
return q
def _parSquare(self, num):
return self.squareArg(num)
@parallel(_parSquare)
def parSquare(self, num):
pass
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
[1, 4, 9, 16]
[1, 4, 9, 16]
最后一件事,在多线程处理时要非常小心,根据数据分段的方式,多线程处理的时间实际上可能比单线程处理的慢,这主要是由于来回复制值以及创建和销毁子进程的开销。
始终对单/多线程进行基准测试,并在可能的情况下正确分段数据。
举个例子:
import numpy
import time
from multiprocessing import Pool
def square(value):
return value*value
if __name__ == '__main__':
pool = Pool(5)
values = range(1000000)
start = time.time()
_ = pool.map(square, values)
pool.close()
pool.join()
end = time.time()
print "multithreaded time %f" % (end - start)
start = time.time()
_ = map(square, values)
end = time.time()
print "single threaded time %f" % (end - start)
start = time.time()
_ = numpy.asarray(values)**2
end = time.time()
print "numpy time %f" % (end - start)
v = numpy.asarray(values)
start = time.time()
_ = v**2
end = time.time()
print "numpy without pre-initialization %f" % (end - start)
给我们:
multithreaded time 0.484441
single threaded time 0.196421
numpy time 0.184163
numpy without pre-initialization 0.004490
关于Python:制作一个返回同一个类的方法的类方法装饰器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11731664/
我之前让 dll 注入(inject)器变得简单,但我有 Windows 7,我用 C# 和 C++ 做了它,它工作得很好!但是现在当我在 Windows 8 中尝试相同的代码时,它似乎没有以正确的方
我正在尝试制作一个名为 core-splitter 的元素,该元素在 1.0 中已弃用,因为它在我们的项目中起着关键作用。 如果您不知道 core-splitter 的作用,我可以提供一个简短的描述。
我有几个不同的蜘蛛,想一次运行所有它们。基于 this和 this ,我可以在同一个进程中运行多个蜘蛛。但是,我不知道如何设计一个信号系统来在所有蜘蛛都完成后停止 react 器。 我试过了: cra
有没有办法在达到特定条件时停止扭曲 react 器。例如,如果一个变量被设置为某个值,那么 react 器应该停止吗? 最佳答案 理想情况下,您不会将变量设置为一个值并停止 react 器,而是调用
https://code.angularjs.org/1.0.0rc9/angular-1.0.0rc9.js 上面的链接定义了外部js文件,我不知道Angular-1.0.0rc9.js的注入(in
我正在尝试运行一个函数并将服务注入(inject)其中。我认为这可以使用 $injector 轻松完成.所以我尝试了以下(简化示例): angular.injector().invoke( [ "$q
在 google Guice 中,我可以使用函数 createInjector 创建基于多个模块的注入(inject)器。 因为我使用 GWT.create 在 GoogleGin 中实例化注入(in
我在 ASP.NET Core 1.1 解决方案中使用配置绑定(bind)。基本上,我在“ConfigureServices Startup”部分中有一些用于绑定(bind)的简单代码,如下所示: s
我在 Spring MVC 中设置 initBinder 时遇到一些问题。我有一个 ModelAttribute,它有一个有时会显示的字段。 public class Model { privat
我正在尝试通过jquery post发布knockoutjs View 模型 var $form = $('#barcodeTemplate form'); var data = ko.toJS(vm
如何为包含多态对象集合的复杂模型编写自定义模型绑定(bind)程序? 我有下一个模型结构: public class CustomAttributeValueViewModel { publi
您好,我正在尝试实现我在 this article 中找到的扩展方法对于简单的注入(inject)器,因为它不支持开箱即用的特定构造函数的注册。 根据这篇文章,我需要用一个假的委托(delegate)
你好,我想自动注册我的依赖项。 我现在拥有的是: public interface IRepository where T : class public interface IFolderReposi
我正在使用 Jasmine 测试一些 Angular.js 代码。为此,我需要一个 Angular 注入(inject)器: var injector = angular.injector(['ng'
我正在使用 Matlab 代码生成器。不可能包含代码风格指南。这就是为什么我正在寻找一个工具来“ reshape ”、重命名和重新格式化生成的代码,根据我的: 功能横幅约定 文件横幅约定 命名约定 等
这个问题在这里已经有了答案: Where and why do I have to put the "template" and "typename" keywords? (8 个答案) 关闭 8
我开发了一种工具,可以更改某些程序的外观。为此,我需要在某些进程中注入(inject)一个 dll。 现在我基本上使用这个 approach .问题通常是人们无法注入(inject) dll,因为他们
我想使用 swing、spring 和 hibernate 编写一个 java 应用程序。 我想使用数据绑定(bind)器用 bean 的值填充 gui,并且我还希望它反射(reflect) gui
我有这段代码,当两个蜘蛛完成后,程序仍在运行。 #!C:\Python27\python.exe from twisted.internet import reactor from scrapy.cr
要点是 Spring Batch (v2) 测试框架具有带有 @Autowired 注释的 JobLauncherTestUtils.setJob。我们的测试套件有多个 Job 类提供者。因为这个类不
我是一名优秀的程序员,十分优秀!