I'm creating a python script to launch many processes via the CLI. Once process per file found in a certain directory. And then I just loop over the files and launch the process which works on that file.
我正在创建一个python脚本,以通过CLI启动许多进程。在特定目录中找到的每个文件只处理一次。然后我只需循环遍历这些文件,并启动处理该文件的过程。
for path in pathlist:
# Prepare cli call
p = subprocess.Popen(cmd...)
processes.append(p)
I'm also adding all processes to a list to wait for them at the end of the script. Since there can be 100s of files I don't want to overload the CPU and make things slower due to too many context switches. Plus at some point memory will also become a limiting factor.
我还将所有进程添加到一个列表中,以便在脚本末尾等待它们。因为可能有数百个文件,所以我不想让CPU过载,也不想因为太多的上下文切换而使速度变慢。此外,在某一时刻,记忆也会成为一个限制因素。
How can I control in above "logic" to not "flood" the cpu/OS and slow things down?
我如何才能控制上面的“逻辑”而不“淹没”CPU/OS并减慢速度呢?
更多回答
You should use the higher level interface provided by concurrent.futures
, which provides an Executor
class is parametrized by the maximum number of workers you want. It handles the logic of managing these workers.
您应该使用ConCurent.Futures提供的更高级别的接口,它提供了一个由您想要的最大工作进程数来参数化的Executor类。它处理管理这些工人的逻辑。
import concurrent.futures
import subprocess
MAX_WORKERS = 5
def worker(i):
with subprocess.Popen(f"sleep 3 && echo {i}", shell=True) as p:
"""Do some stuff if you want with the Popen object"""
""" It will wait for the subprocess to terminate"""
""" before exiting the with block"""
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for i in range(25):
futures.append(executor.submit(worker, i=i))
concurrent.futures.wait(futures)
print("ALL FINISHED")
Note, I wrapped the call to Popen
in a function that uses a context manager (with
statement) to invoke it because by default it is non-bocking. You are going to want to wait for the processes to finish. I don't know why you are using subprocess.Popen
, so I kept it in case you really need it, but generally, you should stick to subprocess.run
for most straightforward cases. It blocks untils the command finishes by default. In that case, you could simplify it to:
注意,我将对POpen的调用包装在一个函数中,该函数使用上下文管理器(with语句)来调用它,因为在默认情况下它是非Bocking的。您将需要等待进程完成。我不知道您为什么要使用子进程。打开,所以我保留了它,以防您真的需要它,但总的来说,您应该坚持使用子进程。适用于大多数简单的情况。默认情况下,它会一直阻止,直到命令完成。在这种情况下,您可以将其简化为:
import concurrent.futures
import subprocess
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for i in range(25):
future = executor.submit(
subprocess.run,
args=f"sleep 3 && echo {i}",
shell=True
)
futures.append(future)
concurrent.futures.wait(futures)
print("ALL FINISHED")
Note, I am using concurrent.futures.wait
which will wait until all futures have completed.
注意,我使用的是concurent.futures.Wait,它将一直等到所有期货都完成。
For your case, I think you are going to have something like:
对于您的情况,我认为您将拥有以下内容:
import concurrent.futures
import subprocess
N_WORKERS = 5
files = get_files_somehow()
with concurrent.futures.ThreadPoolExecutor(max_workers=N_WORKERS) as executor:
futures = []
for path in files:
future = executor.submit(
subprocess.run,
args=["foo", file],
)
futures.append(future)
concurrent.futures.wait(futures)
print("ALL FINISHED")
Play around with the N_WORKERS
to find a suitable value for your use-case.
尝试使用N_Worker来为您的用例找到合适的值。
I came up with this possible solution, good enough for the script I'm making:
我想出了这个可能的解决方案,对我正在制作的脚本来说已经足够好了:
processes = []
for path in pathlist:
# Prepare cli call
p = subprocess.Popen(cmd...)
processes.append(p)
if len(processes) >= 8:
processes[0].wait()
del processes[0]
for proc in processes:
proc.wait()
if the limit is reached I wait for the "oldest" process that was submitted and then remove it from the list, only then will the next one be submitted, at the end I wait for the remaining ones of them to complete.
如果达到限制,我等待提交的“最旧”进程,然后将其从列表中删除,只有这样,下一个进程才会提交,最后我等待剩余的进程完成。
Here is the code:
以下是代码:
# Set a limit for the number of concurrent processes
max_processes = 10
# Initialize a counter for running processes
current_processes = 0
for path in pathlist:
# Check if the maximum number of processes is reached
while current_processes >= max_processes:
time.sleep(1) # Wait for a while before checking again
# Prepare cli call
p = subprocess.Popen(cmd...)
processes.append(p)
current_processes += 1
# Wait for all processes to finish
for p in processes:
p.wait()
更多回答
I don't see any code that would decrease the current_processes
variable? Your code will go in an infinite loop in case current_processes >= max_processes
is ever True
我没有看到任何可以减少CURRENT_PROCESSES变量的代码?如果CURRENT_PROCESSES>=max_PROCESSES为True,您的代码将进入无限循环
我是一名优秀的程序员,十分优秀!