gpt4 book ai didi

python - 使用 Python 的 Multiprocessing 模块执行同时和单独的 SEAWAT/MODFLOW 模型运行

转载 作者:IT老高 更新时间:2023-10-28 21:13:11 26 4
gpt4 key购买 nike

我正在尝试在我的 8 处理器 64 位 Windows 7 机器上运行 100 个模型。我想同时运行模型的 7 个实例以减少我的总运行时间(每次模型运行大约 9.5 分钟)。我已经查看了与 Python 的 Multiprocessing 模块有关的几个线程,但仍然缺少一些东西。

Using the multiprocessing module

How to spawn parallel child processes on a multi-processor system?

Python Multiprocessing queue

我的流程:

我有 100 个不同的参数集,我想通过 SEAWAT/MODFLOW 来比较结果。我已经为每个模型运行预先构建了模型输入文件,并将它们存储在自己的目录中。我希望能够一次运行 7 个模型,直到完成所有实现。进程之间不需要通信或结果显示。到目前为止,我只能按顺序生成模型:

import os,subprocess
import multiprocessing as mp

ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
files = []
for f in os.listdir(ws + r'\fieldgen\reals'):
if f.endswith('.npy'):
files.append(f)

## def work(cmd):
## return subprocess.call(cmd, shell=False)

def run(f,def_param=ws):
real = f.split('_')[2].split('.')[0]
print 'Realization %s' % real

mf2k = r'c:\modflow\mf2k.1_19\bin\mf2k.exe '
mf2k5 = r'c:\modflow\MF2005_1_8\bin\mf2005.exe '
seawatV4 = r'c:\modflow\swt_v4_00_04\exe\swt_v4.exe '
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '

exe = seawatV4x64
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real

os.system( exe + swt_nam )


if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
tasks = range(len(files))
results = []
for f in files:
r = p.map_async(run(f), tasks, callback=results.append)

我换了 if __name__ == 'main':对以下内容,希望它能解决我认为由 for loop 赋予上述脚本的并行性不足的问题。 .但是,模型甚至无法运行(没有 Python 错误):
if __name__ == '__main__':
p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes
p.map_async(run,((files[f],) for f in range(len(files))))

非常感谢任何和所有帮助!

编辑 3/26/2012 13:31 EST

在@J.F. 中使用“手动池”方法。下面塞巴斯蒂安的回答我得到了我的外部 .exe 的并行执行。模型实现一次以 8 个批处理调用,但在调用下一批之前不会等待这 8 个运行完成,依此类推:
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread

def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam])

def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))

# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()

for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion

if __name__ == '__main__':

mp.freeze_support() # optional if the program is not frozen
main()

没有错误回溯可用。 run()函数在调用单个模型实现文件时执行其职责,就像多个文件一样。唯一的区别是,对于多个文件,它被称为 len(files)尽管每个实例都会立即关闭并且只允许一个模型运行完成,但此时脚本会正常退出(退出代码 0)。

添加一些打印语句到 main()揭示有关事件线程数以及线程状态的一些信息(请注意,这是仅对 8 个实现文件进行的测试,以使屏幕截图更易于管理,理论上所有 8 个文件都应同时运行,但行为会在它们所在的位置继续进行生成并立即死亡,除了一个):
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\test')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))

# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
print('Active Count a',threading.activeCount())
for _ in threads:
print(_)
q.put_nowait(None) # signal no more files
for t in threads:
print(t)
t.join() # wait for completion
print('Active Count b',threading.activeCount())

screenshot

**“ D:\\Data\\Users...”这一行是我手动停止模型从运行到完成时抛出的错误信息。一旦我停止运行模型,就会报告剩余的线程状态行并且脚本退出。

编辑 3/26/2012 16:24 EST

SEAWAT 确实允许并发执行,就像我过去所做的那样,使用 iPython 手动生成实例并从每个模型文件夹启动。这一次,我从一个位置启动所有模型运​​行,即我的脚本所在的目录。看起来罪魁祸首可能是 SEAWAT 保存部分输出的方式。当 SEAWAT 运行时,它会立即创建与模型运行相关的文件。其中一个文件不是保存在模型实现所在的目录中,而是保存在脚本所在的顶层目录中。这阻止了任何后续线程将相同的文件名保存在相同的位置(他们都希望这样做,因为这些文件名对于每个实现都是通用的且非特定的)。 SEAWAT 窗口的打开时间不足以让我阅读或什至看到有错误消息,当我返回并尝试使用 iPython 运行代码时才意识到这一点,该代码直接显示 SEAWAT 的打印输出而不是打开一个运行程序的新窗口。

我接受@J.F. Sebastian 的回答很可能是因为一旦我解决了这个模型可执行问题,他提供的线程代码就会让我到达我需要的地方。

最终代码

在 subprocess.check_call 中添加了 cwd 参数以在其自己的目录中启动每个 SEAWAT 实例。很关键。
from __future__ import print_function
import os,subprocess,sys
import multiprocessing as mp
from Queue import Queue
from threading import Thread
import threading

def run(f,ws):
real = f.split('_')[-1].split('.')[0]
print('Realization %s' % real)
seawatV4x64 = r'c:\modflow\swt_v4_00_04\exe\swt_v4x64.exe '
cwd = ws + r'\reals\real%s\ss' % real
swt_nam = ws + r'\reals\real%s\ss\ss.nam_swt' % real
subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd)

def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)

def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
q = Queue()
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))

# start threads
threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion

if __name__ == '__main__':
mp.freeze_support() # optional if the program is not frozen
main()

最佳答案

我在 Python 代码中没有看到任何计算。如果您只需要并行执行多个外部程序,则使用 subprocess 就足够了。运行程序和 threading模块来保持运行的进程数不变,但最简单的代码是使用 multiprocessing.Pool :

#!/usr/bin/env python
import os
import multiprocessing as mp

def run(filename_def_param):
filename, def_param = filename_def_param # unpack arguments
... # call external program on `filename`

def safe_run(*args, **kwargs):
"""Call run(), catch exceptions."""
try: run(*args, **kwargs)
except Exception as e:
print("error: %s run(*%r, **%r)" % (e, args, kwargs))

def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
workdir = os.path.join(ws, r'fieldgen\reals')
files = ((os.path.join(workdir, f), ws)
for f in os.listdir(workdir) if f.endswith('.npy'))

# start processes
pool = mp.Pool() # use all available CPUs
pool.map(safe_run, files)

if __name__=="__main__":
mp.freeze_support() # optional if the program is not frozen
main()

如果有很多文件,那么 pool.map()可以替换为 for _ in pool.imap_unordered(safe_run, files): pass .

还有 mutiprocessing.dummy.Pool提供与 multiprocessing.Pool 相同的接口(interface)但是使用线程而不是在这种情况下可能更合适的进程。

您不需要保留一些 CPU 空闲。只需使用一个以低优先级启动可执行文件的命令(在 Linux 上它是一个 nice 程序)。

ThreadPoolExecutor example

concurrent.futures.ThreadPoolExecutor 既简单又足够,但它需要 3rd-party dependency on Python 2.x (它从 Python 3.2 开始就在 stdlib 中)。
#!/usr/bin/env python
import os
import concurrent.futures

def run(filename, def_param):
... # call external program on `filename`

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
future_to_file = dict((executor.submit(run, f, ws), f) for f in files)

for future in concurrent.futures.as_completed(future_to_file):
f = future_to_file[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (f, future.exception()))
# run() doesn't return anything so `future.result()` is always `None`

或者如果我们忽略由 run() 引发的异常:
from itertools import repeat

... # the same

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(run, files, repeat(ws))
# run() doesn't return anything so `map()` results can be ignored
subprocess + threading (手动池)解决方案
#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread

def run(filename, def_param):
... # define exe, swt_nam
subprocess.check_call([exe, swt_nam]) # run external program

def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)

# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))

for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion

关于python - 使用 Python 的 Multiprocessing 模块执行同时和单独的 SEAWAT/MODFLOW 模型运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9874042/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com