gpt4 book ai didi

python - 写入多个输出文件的程序的流式包装器

转载 作者:行者123 更新时间:2023-11-28 22:36:11 25 4
gpt4 key购买 nike

有一个程序(我无法修改)创建两个输出文件。我正在尝试编写一个调用此程序的 Python 包装器,同时读取两个输出流,组合输出并打印到标准输出(以促进流式传输)。我怎样才能在没有死锁的情况下做到这一点?下面的概念证明工作正常,但当我将这种方法应用于实际程序时它会死锁。


概念验证:这是一个虚拟程序,bogus.py,它创建了两个输出文件,就像我试图包装的程序一样。

#!/usr/bin/env python
from __future__ import print_function
import sys
with open(sys.argv[1], 'w') as f1, open(sys.argv[2], 'w') as f2:
for i in range(1000):
if i % 2 == 0:
print(i, file=f1)
else:
print(i, file=f2)

这里是调用程序并组合其两个输出的 Python 包装器(每次交错 4 行)。

#!/usr/bin/env python
from __future__ import print_function
from contextlib import contextmanager
import os
import shutil
import subprocess
import tempfile

@contextmanager
def named_pipe():
"""
Create a temporary named pipe.

Stolen shamelessly from StackOverflow:
http://stackoverflow.com/a/28840955/459780
"""
dirname = tempfile.mkdtemp()
try:
path = os.path.join(dirname, 'named_pipe')
os.mkfifo(path)
yield path
finally:
shutil.rmtree(dirname)

with named_pipe() as f1, named_pipe() as f2:
cmd = ['./bogus.py', f1, f2]
child = subprocess.Popen(cmd)
with open(f1, 'r') as in1, open(f2, 'r') as in2:
buff = list()
for i, lines in enumerate(zip(in1, in2)):
line1 = lines[0].strip()
line2 = lines[1].strip()
print(line1)
buff.append(line2)
if len(buff) == 4:
for line in buff:
print(line)

最佳答案

I'm seeing big chunks of one file and then big chunks of the other file, regardless of whether I write to stdout, stderr, or tty.

如果您不能让 child 对文件使用行缓冲,那么一个简单的解决方案在输出可用时进程仍在运行时从输出文件中读取完整的交错行 是使用线程:

#!/usr/bin/env python2
from subprocess import Popen
from threading import Thread
from Queue import Queue

def readlines(path, queue):
try:
with open(path) as pipe:
for line in iter(pipe.readline, ''):
queue.put(line)
finally:
queue.put(None)

with named_pipes(n=2) as paths:
child = Popen(['python', 'child.py'] + paths)
queue = Queue()
for path in paths:
Thread(target=readlines, args=[path, queue]).start()
for _ in paths:
for line in iter(queue.get, None):
print line.rstrip('\n')

哪里named_pipes(n) is defined here .

pipe.readline() 对于 Python 2 上的非阻塞管道已损坏,这就是此处使用线程的原因。


打印一个文件中的一行,然后打印另一个文件中的一行:

with named_pipes(n=2) as paths:
child = Popen(['python', 'child.py'] + paths)
queues = [Queue() for _ in paths]
for path, queue in zip(paths, queues):
Thread(target=readlines, args=[path, queue]).start()
while queues:
for q in queues:
line = q.get()
if line is None: # EOF
queues.remove(q)
else:
print line.rstrip('\n')

如果 child.py 向一个文件写入的行多于另一个文件,那么差异将保留在内存中,因此 queues 中的各个队列可能无限增长,直到它们填满所有内存。您可以设置队列中项目的最大数量,但您必须将超时传递给 q.get() 否则代码可能会死锁。


如果您需要从一个输出文件中精确打印 4 行,然后从另一个输出文件中精确打印 4 行,等等,那么您可以稍微修改给定的代码示例:

    while queues:
# print 4 lines from one queue followed by 4 lines from another queue
for q in queues:
for _ in range(4):
line = q.get()
if line is None: # EOF
queues.remove(q)
break
else:
print line.rstrip('\n')

它不会死锁,但如果您的子进程将太多数据写入一个文件而没有向另一个文件写入足够的数据,它可能会吃掉所有内存(只有差异保留在内存中——如果文件相对相等;程序支持任意大的输出文件)。

关于python - 写入多个输出文件的程序的流式包装器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37672160/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com