gpt4 book ai didi

Python:从多个子进程异步打印标准输出

转载 作者:可可西里 更新时间:2023-11-01 09:29:01 24 4
gpt4 key购买 nike

我正在测试一种从 Python 2.7 中的多个子进程打印出标准输出的方法。我设置的是一个主进程,目前生成三个子进程并吐出它们的输出。每个子进程都是一个 for 循环,它会随机休眠一段时间,当它醒来时,会说“Slept for X seconds”。

我看到的问题是打印输出似乎是同步的。假设子进程 A 休眠 1 秒,子进程 B 休眠 3 秒,子进程 C 休眠 10 秒。主进程在尝试查看子进程 C 是否有内容时停止了整整 10 秒,即使其他两个进程可能已经休眠并打印了一些内容。这是为了模拟一个子进程是否真的比其他两个进程在更长的时间内没有输出。

我需要一个适用于 Windows 的解决方案。

我的代码如下:

主进程.py

import sys
import subprocess

logfile = open('logfile.txt', 'w')
processes = [
subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1),
]


while True:
line = processes[0].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)

line = processes[1].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)

line = processes[2].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)

#If everyone is dead, break
if processes[0].poll() is not None and \
processes[1].poll() is not None and \
processes[2].poll() is not None:
break

processes[0].wait()
processes[1].wait()

print 'Done'

subproc_1.py/subproc_2.py/subproc_3.py

import time, sys, random

sleep_time = random.random() * 3
for x in range(0, 20):
print "[PROC1] Slept for {0} seconds".format(sleep_time)
sys.stdout.flush()
time.sleep(sleep_time)
sleep_time = random.random() * 3 #this is different for each subprocess.

更新:解决方案

将下面的答案与 this question 结合起来,这是应该的。

import sys
import subprocess
from threading import Thread

try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # for Python 3.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()

if __name__ == '__main__':
logfile = open('logfile.txt', 'w')
processes = [
subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1),
]
q = Queue()
threads = []
for p in processes:
threads.append(Thread(target=enqueue_output, args=(p.stdout, q)))

for t in threads:
t.daemon = True
t.start()

while True:
try:
line = q.get_nowait()
except Empty:
pass
else:
sys.stdout.write(line)
logfile.write(line)
logfile.flush()

#break when all processes are done.
if all(p.poll() is not None for p in processes):
break

print 'All processes done'

我不确定在 while 循环结束时是否需要任何清理代码。如果有人对此有意见,请添加。

每个 subproc 脚本看起来都与此类似(我为了制作更好的示例而进行了编辑):

import datetime, time, sys, random

for x in range(0, 20):
sleep_time = random.random() * 3
time.sleep(sleep_time)
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%H%M%S.%f')
print "[{0}][PROC1] Slept for {1} seconds".format(timestamp, sleep_time)
sys.stdout.flush()

print "[{0}][PROC1] Done".format(timestamp)
sys.stdout.flush()

最佳答案

您的问题来自于 readline() 是一个阻塞函数;如果你在一个文件对象上调用它并且没有一行等待被读取,那么直到有一行输出时调用才会返回。因此,您现在拥有的将按此顺序从子进程 1、2 和 3 重复读取,在每个子进程处暂停,直到输出准备就绪。

(编辑:OP 澄清说它们在 Windows 上,这使得下面的内容不适用。)

如果你想从任何准备好的输出流中读取,你需要以非阻塞方式检查流的状态,使用 select 模块,然后尝试只读取那些准备好了。 select 提供了执行此操作的多种方法,但为了示例起见,我们将使用 select.select()。启动您的子流程后,您将拥有类似的内容:

streams = [p.stdout for p in processes]

def output(s):
for f in [sys.stdout, logfile]:
f.write(s)
f.flush()

while True:
rstreams, _, _ = select.select(streams, [], [])
for stream in rstreams:
line = stream.readline()
output(line)
if all(p.poll() is not None for p in processes):
break

for stream in streams:
output(stream.read())

select() 的作用是,当使用三个文件对象(或文件描述符)列表调用时,返回其参数的三个子集,即准备好读取的流,就绪写,或​​有一个错误条件。因此,在循环的每次迭代中,我们都会检查哪些输出流已准备好读取,并只对这些进行迭代。然后我们重复。 (请注意,在这里对输出进行行缓冲很重要;上面的代码假定如果流已准备好读取,则至少有一个完整的行可以读取。如果您指定不同的缓冲,则上述代码可能会阻塞。)

您的原始代码还有一个问题:当您在 poll() 报告所有子进程已退出后退出循环时,您可能没有读取它们的所有输出。因此,您需要对流进行最后一次扫描以读取任何剩余的输出。

注意:我给出的示例代码并没有尽一切努力来准确地按照可用的顺序捕获子进程的输出(这不可能做到完美,但可以比上面的管理更接近去做)。它还缺乏其他改进(例如,在主循环中,它会继续选择每个子进程的标准输出,即使在某些子进程已经终止之后,这是无害的,但效率低下)。它只是为了说明一种非阻塞 IO 的基本技术。

关于Python:从多个子进程异步打印标准输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22565606/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com