gpt4 book ai didi

python:读取线程中的子进程输出

转载 作者:太空狗 更新时间:2023-10-29 18:30:52 24 4
gpt4 key购买 nike

我有一个使用 subprocess.Popen 调用的可执行文件。然后,我打算使用一个线程通过 stdin 向它提供一些数据,该线程从队列中读取其值,该队列稍后将填充到另一个线程中。应该在另一个线程中使用标准输出管道读取输出,并再次在队列中排序。

据我之前的研究了解,将线程与队列一起使用是一种很好的做法。

不幸的是,外部可执行文件不会快速给我输入的每一行的答案,因此简单的写入、读取行循环不是一个选项。可执行文件实现了一些内部多线程,我希望输出一可用,因此需要额外的读取器线程。

作为测试可执行文件的示例,它只会随机播放每一行 (shuffleline.py):

#!/usr/bin/python -u
import sys
from random import shuffle

for line in sys.stdin:
line = line.strip()

# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)

sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers

请注意,这已经尽可能无缓冲。或者不是吗?这是我精简的测试程序:

#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess

class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue

def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()

class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue

def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)

if __name__ == "__main__":

cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

source_queue = Queue.Queue()
target_queue = Queue.Queue()

writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()

reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()

# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")

print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()

import time
time.sleep(2) # expect some output from reader thread

source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()

这给出了以下输出(python2.7):

writing to process:  'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True

然后 2 秒内什么都没有...

reader read:  rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr

开头的交错是意料之中的。然而,直到子进程结束之后,子进程的输出才会出现。随着更多行的管道输入,我得到了一些输出,因此我假设 stdout 管道中存在缓存问题。根据此处发布的其他问题,刷新标准输出(在子进程中)应该可以工作,至少在 Linux 上是这样。

最佳答案

您的问题与 subprocess 模块或线程(尽管存在问题)无关,甚至与混合子进程和线程无关(一个非常的坏主意,甚至更糟而不是一开始就使用线程,除非您使用的是 Python 3.2 子进程模块的反向端口,您可以从 code.google.com/p/python-subprocess32 获得)或从多个线程访问相同的内容(如您的 print 语句做。)

发生的是您的 shuffleline.py 程序缓冲区。不是输出,而是输入。虽然不是很明显,但当您遍历文件对象时,Python 将以 block 为单位读取,通常为 8k 字节。由于 sys.stdin 是一个文件对象,您的 for 循环将缓冲直到 EOF 或一个完整的 block :

for line in sys.stdin:
line = line.strip()
....

如果您不想这样做,可以使用 while 循环调用 sys.stdin.readline()(对于 EOF 返回 ''):

while True:
line = sys.stdin.readline()
if not line:
break
line = line.strip()
...

或使用 iter() 的双参数形式,它创建一个迭代器,该迭代器调用第一个参数直到返回第二个参数(“sentinel”):

for line in iter(sys.stdin.readline, ''):
line = line.strip()
...

如果我不建议为此不使用线程,而是在子进程的管道上使用非阻塞 I/O,或者甚至像 twisted.reactor.spawnProcess 这样具有将流程和其他事物作为消费者和生产者 Hook 在一起的许多方法。

关于python:读取线程中的子进程输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9811413/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com