gpt4 book ai didi

python - 从多个子进程进行非阻塞读取(Python)

转载 作者:行者123 更新时间:2023-11-28 18:47:35 24 4
gpt4 key购买 nike

我目前有以下代码,灵感来自 Non-blocking read on a subprocess.PIPE in python 的答案.它似乎工作正常,将行输出到屏幕,但它只对第一个创建的进程这样做,所有其他进程(正在运行)不会打印任何数据。

如何确保我可以(以非阻塞方式)从多个子进程读取数据?

#!/usr/bin/env python
import sys
import os
import subprocess
from threading import Thread
from Queue import Queue, Empty

STREAMER_URL = 'rtmp://127.0.0.1/app'
RTMPDUMP_EXECUTEABLE = 'rtmpdump'

def enqueue_output(out, queue):
for line in iter(lambda: out.read(16), b''):
queue.put(line)
out.close()

def download_rtmp(media, filename):
# Create parameters
args=[RTMPDUMP_EXECUTEABLE]
args.extend(['-r',media[0],'-y',media[1]])

# Set output file
OUTPUT_FILE = filename
args.extend(['-o',OUTPUT_FILE])

# Send rtmpdump any extra arguments
if len(sys.argv) > 2:
args.extend(sys.argv[2:])

# Execute rtmpdump
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
return (p, q, t)

def main():

# actual data is from somewhere else on the internet
for (name, playpath, filepath) in data:
print 'Spawning %s download...' % name
PROCESSES.append(download_rtmp((STREAMER_URL, playpath), filepath))

BUFS = dict()

# infinite loop checking if all processes have finished
while True:
done = True
for (process, queue, thread) in PROCESSES:
try:
readdata = queue.get_nowait()
except Empty:
pass
else:
if process in BUFS:
readdata = BUFS[process] + readdata
lines = readdata.split('\n')
if len(lines) > 1:
for line in lines[:-1]:
print 'Line: %s' % line
if '\r' in lines[-1]:
lines = readdata.split('\r')
for line in lines[:-1]:
print 'Line2: %s' % line
BUFS[process] = lines[-1]

process.poll()

if process.returncode is None:
done = False
break
if done:
break

print "Done"

if __name__ == "__main__":
main()

最佳答案

我还没有弄清楚整个事情,但是 if process.returncode is None: 中的中断意味着在第一个进程完全退出之前,您不会查看其他进程队列。而且我不确定你从哪里得到多队列轮询的东西,但它绝对可怕。

这个问题最好通过所有工作线程使用的单个返回队列来解决。工作人员传递(进程,行)的元组,主线程阻塞等待来自所有工作人员的数据。

这确实是伪代码,但它看起来像:

STREAMER_URL = 'rtmp://127.0.0.1/app'
RTMPDUMP_EXECUTEABLE = 'rtmpdump'

def enqueue_output(process, queue):
"""read process stdout in small chunks and queue for processing"""
for line in iter(lambda: out.read(16), b''):
queue.put((process, line))
process.wait()
queue.put((process, None))

def download_rtmp(media, filename):
# Create parameters
args=[RTMPDUMP_EXECUTEABLE, '-r', media[0], '-y', media[1], '-o', filename]

# Send rtmpdump any extra arguments
# if len(sys.argv) > 2: no need for the if in list comprehension
args.extend(sys.argv[2:])

# Execute rtmpdump
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
t = Thread(target=enqueue_output, args=(p, return_q))
t.daemon = True # thread dies with the program
t.start()
return (p, t)

def main():
THREADS = []
BUFS = dict()

# actual data is from somewhere else on the internet
for (name, playpath, filepath) in data:
print 'Spawning %s download...' % name
process, thread = download_rtmp((STREAMER_URL, playpath), filepath)
BUFS[process] = ''
THREADS.append(thread)

# all processes write to return_q and we process them here
while BUFS:
process, line = return_q.get()
readdata = BUFS[process] + (line or '')
if line is None:
del BUFS[process]
# I didn't try to figure this part out... basically, when line is
# None, process is removed from BUFS so you know your end condition
# and the following stuff should do its final processing.
lines = readdata.split('\n')
if len(lines) > 1:
for line in lines[:-1]:
print 'Line: %s' % line
if '\r' in lines[-1]:
lines = readdata.split('\r')
for line in lines[:-1]:
print 'Line2: %s' % line
if line is not None:
BUFS[process] = lines[-1]

关于python - 从多个子进程进行非阻塞读取(Python),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17667004/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com