gpt4 book ai didi

python - 使用 Scoop 编程 DEAP

转载 作者:太空宇宙 更新时间:2023-11-04 02:57:04 25 4
gpt4 key购买 nike

我在 python 中使用 DEAP 库来解决多目标优化问题。我想为此任务使用多个处理器;但是,我遇到了一些麻烦。

为了提供一些背景信息,我将 networkx 与 DEAP 结合使用我还定义了适应度函数、交叉和变异函数(由于某些原因我不会在此处显示)。

它说 here我需要做的就是安装 Scoop 并添加行

from scoop import futures

toolbox.register("map", futures.map)

但是我似乎得到了一个错误:

scoop._comm.scoopexceptions.ReferenceBroken: 'module' object has no attribute 'Chromosome'

在做了一些挖掘之后,我发现我需要将调用移动到主模块中指定的 creator.create here .

这样做之后,我得到另一个错误:

scoop._comm.scoopexceptions.ReferenceBroken: This element could not be pickled: FutureId(worker='127.0.0.1:49663', rank=1):partial(<Chromosome representation of a solution here>)=None

我对并行计算不是很熟悉,我不太清楚“不能被腌制”是什么意思。可以在此处查看完整代码并进行一些编辑:

def genetic(network, creator, no_sensors, sfpd, lambda1, lambda2, lambda3, k):
locations = network.graph.nodes()
#move creator.create calls to the main module
########################################
creator.create("FitnessMax", base.Fitness, weights=(lambda1, -lambda2, lambda3))
creator.create("Chromosome", list, fitness=creator.FitnessMax)
########################################

toolbox = base.Toolbox()
toolbox.register("attr_item", random.sample, locations, no_sensors)
toolbox.register("chromosome", tools.initRepeat, creator.Chromosome, toolbox.attr_item, n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.chromosome)

toolbox.register("map", futures.map) #######<-- this line ##############

def evaluate(chromosome):
#fitness function defined here

# Crossover
def crossover(chromosome1, chromosome2): # Uniform Crossover
#crossover is defined here

# Mutation
def mutation(chromosome):
#mutation is defined here

toolbox.register("evaluate", evaluate)
toolbox.register("mate", crossover)
toolbox.register("mutate", mutation)
toolbox.register("select", tools.selNSGA2)

random.seed(64)
pop = toolbox.population(n=MU)
hof = tools.ParetoFront()
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", numpy.mean, axis=0)
stats.register("min", numpy.min, axis=0)
stats.register("max", numpy.max, axis=0)

algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats, halloffame=hof)

return list(hof)

谢谢,任何见解都将非常有值(value)。

最佳答案

这是使用 joblib 和 dill 的解决方法。

首先:使用 monkeypatch joblib 使其 pickle

import dill
from dill import Pickler
import joblib
joblib.parallel.pickle = dill
joblib.pool.dumps = dill.dumps
joblib.pool.Pickler = Pickler

from joblib.pool import CustomizablePicklingQueue
from io import BytesIO
from pickle import HIGHEST_PROTOCOL


class CustomizablePickler(Pickler):
"""Pickler that accepts custom reducers.
HIGHEST_PROTOCOL is selected by default as this pickler is used
to pickle ephemeral datastructures for interprocess communication
hence no backward compatibility is required.
`reducers` is expected expected to be a dictionary with key/values
being `(type, callable)` pairs where `callable` is a function that
give an instance of `type` will return a tuple `(constructor,
tuple_of_objects)` to rebuild an instance out of the pickled
`tuple_of_objects` as would return a `__reduce__` method. See the
standard library documentation on pickling for more details.
"""

# We override the pure Python pickler as its the only way to be able to
# customize the dispatch table without side effects in Python 2.6
# to 3.2. For Python 3.3+ leverage the new dispatch_table
# feature from http://bugs.python.org/issue14166 that makes it possible
# to use the C implementation of the Pickler which is faster.

def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
Pickler.__init__(self, writer, protocol=protocol)
if reducers is None:
reducers = {}
# Make the dispatch registry an instance level attribute instead of
# a reference to the class dictionary under Python 2
self.dispatch = Pickler.dispatch.copy()
for type, reduce_func in reducers.items():
self.register(type, reduce_func)

def register(self, type, reduce_func):
if hasattr(Pickler, 'dispatch'):
# Python 2 pickler dispatching is not explicitly customizable.
# Let us use a closure to workaround this limitation.
def dispatcher(self, obj):
reduced = reduce_func(obj)
self.save_reduce(obj=obj, *reduced)
self.dispatch[type] = dispatcher
else:
self.dispatch_table[type] = reduce_func

joblib.pool.CustomizablePickler = CustomizablePickler


def _make_methods(self):
self._recv = recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release

def get():
racquire()
try:
return recv()
finally:
rrelease()

self.get = get

def send(obj):
buffer = BytesIO()
CustomizablePickler(buffer, self._reducers).dump(obj)
self._writer.send_bytes(buffer.getvalue())

self._send = send

if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
self.put = send
else:
wlock_acquire, wlock_release = (
self._wlock.acquire, self._wlock.release)

def put(obj):
wlock_acquire()
try:
return send(obj)
finally:
wlock_release()

self.put = put

CustomizablePicklingQueue._make_methods = _make_methods

第二个:

from joblib import Parallel, delayed

def mymap(f, *iters):
return Parallel(n_jobs=-1)(delayed(f)(*args) for args in zip(*iters))

最后只需注册 map :

toolbox.register("map", mymap)

它与您链接的示例完美配合。你可以integrate dask 和 joblib 将此解决方案扩展到集群。使用 dask-drmaa而且您几乎拥有独家新闻所具有的相同功能。

可以找到示例代码here .

关于python - 使用 Scoop 编程 DEAP,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42051318/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com