gpt4 book ai didi

Python并行编程模型

转载 作者:行者123 更新时间:2023-11-30 09:50:53 24 4
gpt4 key购买 nike

我正在编写一个包含以下组件的机器学习程序:

  1. 具有二叉树数据结构的共享“经验池”。

  2. N 个模拟器进程。每个人每隔一段时间就会向池中添加一个“体验对象”。池负责平衡其树。

  3. M 个学习者进程每隔几分钟从池中采样一批“体验对象”并执行任何学习过程。

我不知道实现上述内容的最佳方法是什么。我没有使用 Tensorflow,因此无法利用其并行功能。更具体地说,

  • 我首先想到的是 Python3 的内置多处理库。然而,与多线程不同的是,多处理模块不能让不同的进程更新同一个全局对象。我的直觉是我应该使用服务器代理模型。有人可以给我一个粗略的框架代码吗?
  • MPI4py 是更好的解决方案吗?
  • 还有其他更适合的库吗?我研究过 celerydisque 等。对于我来说,如何使它们适应我的用例并不明显。

最佳答案

根据评论,您真正需要的是一种从执行 CPU 密集型任务的一组进程中更新共享对象的方法。 CPU 限制使多处理成为一个显而易见的选择 - 如果您的大部分工作都是 IO 限制,那么多线程将是一个更简单的选择。

您的问题遵循更简单的服务器-客户端模型:客户端将服务器用作简单的有状态存储,不需要任何子进程之间的通信,并且不需要同步任何进程。

因此,最简单的方法是:

  1. 启动一个包含服务器的单独进程。
  2. 在服务器逻辑内部,提供更新和读取单个对象的方法。
  3. 将模拟器和学习者进程视为单独的客户端,可以定期读取和更新全局状态。

从服务器的角度来看,客户端的身份并不重要——重要的是他们的操作。

因此,这可以通过使用 customised manager 来完成在多处理中如下:

# server.py

from multiprocessing.managers import BaseManager
# this represents the data structure you've already implemented.
from ... import ExperienceTree

# An important note: the way proxy objects work is by shared weak reference to
# the object. If all of your workers die, it takes your proxy object with
# it. Thus, if you have an instance, the instance is garbage-collected
# once all references to it have been erased. I have chosen to sidestep
# this in my code by using class variables and objects so that instances
# are never used - you may define __init__, etc. if you so wish, but
# just be aware of what will happen to your object once all workers are gone.
class ExperiencePool(object):

tree = ExperienceTree()

@classmethod
def update(cls, experience_object):
''' Implement methods to update the tree with an experience object. '''
cls.tree.update(experience_object)

@classmethod
def sample(cls):
''' Implement methods to sample the tree's experience objects. '''
return cls.tree.sample()

# subclass base manager
class Server(BaseManager):
pass

# register the class you just created - now you can access an instance of
# ExperiencePool using Server.Shared_Experience_Pool().
Server.register('Shared_Experience_Pool', ExperiencePool)

if __name__ == '__main__':
# run the server on port 8080 of your own machine
with Server(('localhost', 8080), authkey=b'none') as server_process:
server_process.get_server().serve_forever()

现在,对于所有客户,您可以这样做:

# client.py - you can always have a separate client file for a learner and a simulator.

from multiprocessing.managers import BaseManager
from server import ExperiencePool

class Server(BaseManager):
pass

Server.register('Shared_Experience_Pool', ExperiencePool)

if __name__ == '__main__':
# run the server on port 8080 of your own machine forever.
server_process = Server(('localhost', 8080), authkey=b'none')
server_process.connect()
experience_pool = server_process.Shared_Experience_Pool()
# now do your own thing and call `experience_call.sample()` or `update` whenever you want.

然后,您可以根据需要启动一个 server.py 和任意数量的 workers

这是最好的设计吗?

并非总是如此。您可能会遇到竞争条件,因为如果您的学习者被迫与同时写入的模拟器节点竞争,他们可能会收到过时或旧的数据。

如果您想确保优先选择最新写入,您还可以使用 lock每当您的模拟器尝试写入某些内容时,都会阻止其他进程在写入完成之前进行读取。

关于Python并行编程模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45339279/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com