gpt4 book ai didi

multithreading - 在Python中使用动态生成的MPI深度优先搜索

转载 作者:行者123 更新时间:2023-12-03 13:17:08 25 4
gpt4 key购买 nike

好的,所以我想在树状结构中进行多线程深度优先搜索。我为此使用来自群集中多台计算机的线程(本例中为localhost四核和raspberry pi 2)。主线程应启动该进程,并在树中的第一个拆分处为它拆分为的每个节点生成一个新线程。然后,这些线程应该能够将其发现报告给主服务器。

我试图动态地执行此操作,而不是为mpiexec提供多个线程,因为我不知道树的样子,例如(可能会有2或9个拆分)。

我从正在研究此项目的项目中提取了一个样本,并按以下方式进行工作。它从一串数字中取出一个数字,并为每个数字生成一个线程并将该数字发送到该线程。

对于母版:

#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time

################ Set up MPI variables ################

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()

################ Master code ################

script = 'cpi.py'
for d in '34':
try:
print 'Trying to spawn child process...'
icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
spawnrank = icomm.Get_rank()
icomm.send(d, dest=spawnrank, tag=11)
print 'Spawned rank %d.' % spawnrank
except: ValueError('Spawn failed to start.')

solved = False
while solved == False:
#while not comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG):
# print 'spawns doing some work...'
# time.sleep(1)
solved = comm.recv(source=MPI.ANY_SOURCE, tag=22)
print 'received solution: %d' % solved

它正确地产生了 worker ,他们收到了数字,但没有将其发送回主人。 worker 的代码如下:

worker
#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy

################ Set up MPI variables ################

icomm = MPI.Comm.Get_parent()
comm = MPI.COMM_WORLD
irank = comm.Get_rank()
rank = comm.Get_rank()

running = True
while running:
data = None
data = icomm.recv(source=0, tag=11)
if data:
print 'Trying to send %s from worker rank %d to %d' % (data, rank, irank)
icomm.send(data, dest=0, tag=22)
break
print 'Worker on rank %d done.' % rank
icomm.Disconnect()

它永远不会到达主代码的最后一行。我还在主代码上添加了(注释掉了)一个探针,以检查带有标签22的消息是否卡在某处,排除了recv函数中的错误,但是该探针从未找到该消息。因此,我认为它永远不会发送。

通过打印两个进程的等级 他们都使用了等级0 ,这很有意义,因为它们是在同一台计算机上生成的。但是,当我添加一个主机文件和一个rankfile,试图强制它为从属计算机使用另一台计算机时,它给了我以下错误:
[hch-K55A:06917] *** Process received signal ***
[hch-K55A:06917] Signal: Segmentation fault (11)
[hch-K55A:06917] Signal code: Address not mapped (1)
[hch-K55A:06917] Failing at address: 0x3c
[hch-K55A:06917] [ 0] /lib/x86_64-linux-gnu/libpthread.so.0(+0x10340) [0x7f2c0d864340]
[hch-K55A:06917] [ 1] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(orte_rmaps_rank_file_lex+0x4a0) [0x7f2c0abdcb70]
[hch-K55A:06917] [ 2] /usr/lib/openmpi/lib/openmpi/mca_rmaps_rank_file.so(+0x23ac) [0x7f2c0abda3ac]
[hch-K55A:06917] [ 3] /usr/lib/libopen-rte.so.4(orte_rmaps_base_map_job+0x2e) [0x7f2c0dacd05e]
[hch-K55A:06917] [ 4] /usr/lib/libopen-rte.so.4(orte_plm_base_setup_job+0x5a) [0x7f2c0dac580a]
[hch-K55A:06917] [ 5] /usr/lib/openmpi/lib/openmpi/mca_plm_rsh.so(orte_plm_rsh_launch+0x338) [0x7f2c0b80a8c8]
[hch-K55A:06917] [ 6] /usr/lib/libopen-rte.so.4(+0x51ff4) [0x7f2c0dac3ff4]
[hch-K55A:06917] [ 7] /usr/lib/libopen-rte.so.4(opal_event_base_loop+0x31e) [0x7f2c0dae9cfe]
[hch-K55A:06917] [ 8] mpiexec() [0x4047d3]
[hch-K55A:06917] [ 9] mpiexec() [0x40347d]
[hch-K55A:06917] [10] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0xf5) [0x7f2c0d4b0ec5]
[hch-K55A:06917] [11] mpiexec() [0x403399]
[hch-K55A:06917] *** End of error message ***
Segmentation fault (core dumped)

使用的命令:mpiexec -np 1 --hostfile hostfile --rankfile rankfile python spawntest.py

主机文件:
本地主机
本地主机插槽= 1个最大插槽= 4个
pi2 @ raspi2插槽= 4

排名文件:
等级0 =本地主机插槽= 1
等级1 = pi2 @ raspi2插槽= 1-4

所以我的问题如下:在能够来回发送数据的同时,如何在主计算机以外的其他计算机上生成这些线程?

最佳答案

您的主人的代码非常错误,我感到您对那里发生的事情缺乏概念上的理解。

MPI_COMM_SPAWN(或其对应的mpi4py comm.Spawn())产生的作业中的MPI进程不会成为父级MPI_COMM_WORLD的一部分。生成的过程形成了一个完全独立的世界通信器,并通过一个通信器与父作业互连,这正是生成的返回结果。在您的情况下,icomm = MPI.COMM_SELF.Spawn(...)是主进程中的交互器句柄。子作业中的进程使用MPI_COMM_GET_PARENT(在mpi4py中为MPI.Comm.Get_parent())获取互连器句柄。由于您正在生成单进程作业:

MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
^^^^^^^^^^

在新成立的子工作世界通讯者中,只有一个过程,因此 MPI.COMM_WORLD.Get_rank()在每个 worker 中返回零。

主机代码的这一部分是错误的,但由于互连器的实际工作方式,它仍然可以正常工作:
spawnrank = icomm.Get_rank() # <--- not what you expect
icomm.send(d, dest=spawnrank, tag=11)

相互通信者将两个独立的过程组链接在一起。其中一个称为本地组,另一个称为远程组。在对讲机上使用 MPI_COMM_RANK( comm.Get_rank())时,您会在 本地组中获得调用过程的等级。但是,在发送或接收时,指定的等级与 远程组有关。在您的情况下,产生一个新的工作程序会导致以下内部通信程序:

    mastet's MPI_COMM_SELF           child's MPI_COMM_WORLD
| |
+=============|================================|=============+
| +----------V----------+ +-------------V----------+ |
| | group of the master | | group of the child job | |
| | [ 0 ] | | [ 0 ] | |
| +---------------------+ +------------------------+ |
| intercommunicator |
+============================================================+

(上面的交流者显示每个组的来源;交流者本身不属于交流者)

哪个组是本地组,哪个组是远程组,取决于调用进程属于哪个组。主进程的本地组是子作业中排名的远程组,反之亦然。重要的是,每个组的等级为0,因为一个组中至少有一个进程。您很幸运,主控组中只有一个进程,因此 icomm.Get_rank()返回0(由于主控机的本地组是从 MPI_COMM_SELF派生的,该组始终包含一个进程,因此它总是返回零),这种情况总是发生是远程(子级)组中的有效等级。正确的做法是将消息发送到远程组中已知的固定级别,例如rank 0:
   icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
icomm.send(d, dest=0, tag=11)

(此代码明确发送给远程组的 0排名,而在此之前 0值只是一个幸运的巧合)

就是说,发送部分-尽管不正确-仍然有效。接收部分没有,原因有很多。首先,您使用了错误的通信器-从 MPI_COMM_WORLD接收消息无效,因为子进程不是它的成员。实际上,MPI中的通信器是不可变的-如果不创建新的通信器,则无法添加或删除等级。您应该使用 icomm从工作人员处接收,就像使用发送给他们的方法一样。现在,出现了第二个问题-主文件中的 icomm被每个新的 Spawn覆盖,因此您实际上失去了与除上一个作业之外的任何子作业进行通信的能力。您需要保留一个句柄列表并将其附加到句柄。

接收部分比较复杂。没有 MPI_ANY_COMM-您无法进行覆盖所有子作业的接收操作,因为所有子作业都生活在各自的对讲机中。您应该在对讲机列表上使用 MPI_IPROBE循环,或者(更好)开始从每个 child 开始非阻塞接收,然后使用 MPI_WAIT_SOME(与mpi4py等效)。

通过循环,主代码应如下所示(注意-未经测试的代码,我没有和/或使用mpi4py):
#!/usr/bin/python
from mpi4py import MPI
import datetime, sys, numpy, time

################ Set up MPI variables ################

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
status = MPI.Status()

################ Master code ################

icomms = []
script = 'cpi.py'
for d in '34':
try:
print 'Trying to spawn child process...'
icomm = MPI.COMM_SELF.Spawn(sys.executable, args=[script], maxprocs=1, root=0)
icomm.send(d, dest=0, tag=11)
icomms.append(icomm)
print 'Spawned a child.'
except: ValueError('Spawn failed to start.')

solved = False
while not solved and icomms:
for icomm in icomms:
if icomm.Iprobe(source=0, tag=MPI.ANY_TAG):
print 'A child responded...'
solved = icomm.recv(source=0, tag=MPI.ANY_TAG)
icomm.Disconnect()
icomms.remove(icomm)
if solved: break
if not solved:
print 'spawns doing some work...'
time.sleep(1)
# make sure all pending sends get matched
for icomm in icomms:
icomm.recv(source=0, tag=MPI.ANY_TAG)
icomm.Disconnect()
print 'received solution: %d' % solved

我希望你能明白。

另外:如果您从生成的作业中生成作业,则新的 child 将无法轻松地建立与顶级母版的连接。为此,您应该转到MPI-2客户端/服务器模型支持的晦涩部分,让主服务器使用 MPI_PORT_OPEN打开端口,然后使用 MPI_PUBLISH_NAME向MPI命名服务注册它,最后使用 MPI_COMM_ACCEPT接收来自任何端口的连接其他MPI工作。工作人员应使用 MPI_LOOKUP_NAME获取对端口的引用,并使用 MPI_COMM_CONNECT与主作业建立对讲机。我不知道这些函数的包装器是否存在于mpi4py中,如果存在,如何命名。

关于multithreading - 在Python中使用动态生成的MPI深度优先搜索,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37374563/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com