gpt4 book ai didi

python - 在 pygmo 中使用队列进行函数评估

转载 作者:太空宇宙 更新时间:2023-11-03 14:43:22 28 4
gpt4 key购买 nike

我正在尝试使用pygmo 2.5通过 Anaconda3 安装的优化库以及一些现有代码,这些代码通过执行轨迹优化的可执行文件包装参数向量的异步和分布式评估(如果有人感兴趣的话,它是 POST2)。为了促进这一点,我在网络上使用 multiprocessing.SyncManager 和 multiprocessing.Queues 来传递输入并接收输出和日志消息。因此,在这种情况下,pygmo 将选择要尝试的向量,并且支持代码会将其传递到输入队列中,一些分布式工作人员将抓取该输入队列,通过可执行文件进行评估,并将结果传回,最终将返回给任何人pygmo.algorithm 用于评估

我的问题是,当 pygmo 初始化一个问题时,它会对所提供的类进行深层复制,在我的例子和下面提供的示例代码中,它包含多个队列。执行深层复制后出现错误

  File "pygmo_testing.py", line 121, in <module>
main()
File "pygmo_testing.py", line 108, in main
prob = pg.problem(my_prob)
File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy
y = copier(x, memo)
File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy
rv = reductor(4)
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

有办法解决这个问题吗?我需要保持执行风格异步并分布式用于该代码使用的其他方法。为了完整性,我还尝试了queue.Queue和multiprocessing.Manager.Queue(这两者都不能与其他现有代码一起使用),但它总是归结为深层复制。

谢谢大家!

<小时/>
"""
****************************** Import Statements ******************************
"""
import pygmo as pg
from multiprocessing import Pool, Queue

"""
****************************** Utility Functions ******************************
"""
def sphere_fitness(x):
return sum(x*x)

def worker(inp_q, out_q):

while True:

x = inp_q.get()
print("got {}".format(x))
if x == False:
break
else:
fit = sphere_fitness(x)
print("x: {} f: {}".format(x, fit))
out_q.put_nowait(fit)
print("submitted {}".format(x))
"""
********************************** Class(es) ************************************
"""
class distributed_submit(object):
""" Class for pygmo Problem"""

def __init__(self, dim, inp_q, out_q):
self.dim = dim
self._inp_q = inp_q
self._out_q = out_q

def _submit(self, inp_q, x):
self._inp_q.put_nowait(x)
print("x delivered")

def _receive(self, out_q):
return self._out_q.get()

def fitness(self, x):
self._submit(x)
print("put in {}".format(x))
fit = self._receive()
print("got {}".format(fit))
return [fit]

def get_bounds(self):
return ([-1]*self.dim, [1]*self.dim)

def get_name(self):
return "Sphere Function"

def get_extra_info(self):
return "\tDimensions: {}".format(self.dim)


"""
******************************* Main Function ********************************
"""
def main():

# Queues from multiprocessing
_inp_q = Queue()
_out_q = Queue()

_workers = Pool(initializer=worker,
initargs=(_inp_q, _out_q))
_workers.close()

my_prob = distributed_submit(3, _inp_q, _out_q)

prob = pg.problem(my_prob)

algo = pg.algorithm(pg.bee_colony(gen=20, limit=20))

pop = pg.population(prob, 10)
print(pop)

pop = algo.evolve(pop)

print(pop.champion_f)


if __name__ == "__main__":
main()

最佳答案

对于任何发现此问题并需要答案的人 - collections.deque 支持深度复制

https://docs.python.org/3/library/collections.html#collections.deque

In addition to the above, deques support ... copy.copy(d), copy.deepcopy(d),

但是 collections.deque 不支持进程,因此它不能用于分发

关于python - 在 pygmo 中使用队列进行函数评估,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46429779/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com