- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我需要一种方法来读取 Popen 创建的流中所有当前可用的字符,或者找出缓冲区中剩余的字符数。
背景:我想用 Python 远程控制一个交互式应用程序。到目前为止,我使用 Popen 创建了一个新的子进程:
process=subprocess.Popen(["python"],shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE, cwd=workingDir)
(我不是真的开始python,但实际的交互界面是相似的。)目前我读取了 1 个字节,直到我检测到进程已到达命令提示符:
output = ""
while output[-6:]!="SCIP> ":
output += process.stdout.read(1)
sys.stdout.write(output[-1])
return output
然后我通过 process.stdin.write("command\n")
开始一个冗长的计算。我的问题是,我无法检查计算是否完成,因为我无法检查流中的最后一个字符是否是提示符。 read()
或 read(n)
阻塞我的线程,直到它到达 EOF,它永远不会,因为交互式程序在被告知之前不会结束。以上述循环的方式寻找提示也行不通,因为提示只会在计算之后出现。
理想的解决方案是让我从流中读取所有可用字符,如果没有可读取的内容,则立即返回一个空字符串。
最佳答案
Popen 的标准输出的增量解析真的不是问题。只需将管道插入线程并让它通过输出擦洗,寻找分隔符。根据您的喜好,它可以将其通过管道传输到另一个管道/文件中,或者以异步模式将解析的“ block ”放在“堆栈”上。下面是一个基于自定义分隔符的 stdout 异步“分 block ”示例:
import cStringIO
import uuid
import threading
import os
class InputStreamChunker(threading.Thread):
'''
Threaded object / code that mediates reading output from a stream,
detects "separation markers" in the stream and spits out chunks
of original stream, split when ends of chunk are encountered.
Results are made available as a list of filled file-like objects
(your choice). Results are accessible either "asynchronously"
(you can poll at will for results in a non-blocking way) or
"synchronously" by exposing a "subscribe and wait" system based
on threading.Event flags.
Usage:
- instantiate this object
- give our input pipe as "stdout" to other subprocess and start it:
Popen(..., stdout = th.input, ...)
- (optional) subscribe to data_available event
- pull resulting file-like objects off .data
(if you are "messing" with .data from outside of the thread,
be curteous and wrap the thread-unsafe manipulations between:
obj.data_unoccupied.clear()
... mess with .data
obj.data_unoccupied.set()
The thread will not touch obj.data for the duration and will
block reading.)
License: Public domain
Absolutely no warranty provided
'''
def __init__(self, delimiter = None, outputObjConstructor = None):
'''
delimiter - the string that will be considered a delimiter for the stream
outputObjConstructor - instanses of these will be attached to self.data array
(intantiator_pointer, args, kw)
'''
super(InputStreamChunker,self).__init__()
self._data_available = threading.Event()
self._data_available.clear() # parent will .wait() on this for results.
self._data = []
self._data_unoccupied = threading.Event()
self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside
self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in.
self._stop = False
if not delimiter: delimiter = str(uuid.uuid1())
self._stream_delimiter = [l for l in delimiter]
self._stream_roll_back_len = ( len(delimiter)-1 ) * -1
if not outputObjConstructor:
self._obj = (cStringIO.StringIO, (), {})
else:
self._obj = outputObjConstructor
@property
def data_available(self):
'''returns a threading.Event instance pointer that is
True (and non-blocking to .wait() ) when we attached a
new IO obj to the .data array.
Code consuming the array may decide to set it back to False
if it's done with all chunks and wants to be blocked on .wait()'''
return self._data_available
@property
def data_unoccupied(self):
'''returns a threading.Event instance pointer that is normally
True (and non-blocking to .wait() ) Set it to False with .clear()
before you start non-thread-safe manipulations (changing) .data
array. Set it back to True with .set() when you are done'''
return self._data_unoccupied
@property
def data(self):
'''returns a list of input chunkes (file-like objects) captured
so far. This is a "stack" of sorts. Code consuming the chunks
would be responsible for disposing of the file-like objects.
By default, the file-like objects are instances of cStringIO'''
return self._data
@property
def input(self):
'''This is a file descriptor (not a file-like).
It's the input end of our pipe which you give to other process
to be used as stdout pipe for that process'''
return self._w
def flush(self):
'''Normally a read on a pipe is blocking.
To get things moving (make the subprocess yield the buffer,
we inject our chunk delimiter into self.input
This is useful when primary subprocess does not write anything
to our in pipe, but we need to make internal pipe reader let go
of the pipe and move on with things.
'''
os.write(self._w, ''.join(self._stream_delimiter))
def stop(self):
self._stop = True
self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec.
os.close(self._w)
self._data_available.set()
def __del__(self):
try:
self.stop()
except:
pass
try:
del self._w
del self._r
del self._data
except:
pass
def run(self):
''' Plan:
- We read into a fresh instance of IO obj until marker encountered.
- When marker is detected, we attach that IO obj to "results" array
and signal the calling code (through threading.Event flag) that
results are available
- repeat until .stop() was called on the thread.
'''
marker = ['' for l in self._stream_delimiter] # '' is there on purpose
tf = self._obj[0](*self._obj[1], **self._obj[2])
while not self._stop:
l = os.read(self._r, 1)
print('Thread talking: Ordinal of char is:%s' %ord(l))
trash_str = marker.pop(0)
marker.append(l)
if marker != self._stream_delimiter:
tf.write(l)
else:
# chopping off the marker first
tf.seek(self._stream_roll_back_len, 2)
tf.truncate()
tf.seek(0)
self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack?
self._data.append(tf)
self._data_available.set()
tf = self._obj[0](*self._obj[1], **self._obj[2])
os.close(self._r)
tf.close()
del tf
def waitforresults(ch, answers, expect):
while len(answers) < expect:
ch.data_available.wait(0.5); ch.data_unoccupied.clear()
while ch.data:
answers.append(ch.data.pop(0))
ch.data_available.clear(); ch.data_unoccupied.set()
print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) )
def test():
'''
- set up chunker
- set up Popen with chunker's output stream
- push some data into proc.stdin
- get results
- cleanup
'''
import subprocess
ch = InputStreamChunker('\n')
ch.daemon = True
ch.start()
print('starting the subprocess\n')
p = subprocess.Popen(
['cat'],
stdin = subprocess.PIPE,
stdout = ch.input,
stderr = subprocess.PIPE)
answers = []
i = p.stdin
i.write('line1 qwer\n') # will be in results
i.write('line2 qwer\n') # will be in results
i.write('line3 zxcv asdf') # will be in results only after a ch.flush(),
# prepended to other line or when the pipe is closed
waitforresults(ch, answers, expect = 2)
i.write('line4 tyui\n') # will be in results
i.write('line5 hjkl\n') # will be in results
i.write('line6 mnbv') # will be in results only after a ch.flush(),
# prepended to other line or when the pipe is closed
waitforresults(ch, answers, expect = 4)
## now we will flush the rest of input (that last line did not have a delimiter)
i.close()
ch.flush()
waitforresults(ch, answers, expect = 5)
should_be = ['line1 qwer', 'line2 qwer',
'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv']
assert should_be == [i.read() for i in answers]
# don't forget to stop the chunker. It it closes the pipes
p.terminate()
ch.stop()
del p, ch
if __name__ == '__main__':
test()
编辑:删除了关于“写入 proc 的标准输入是一次性的”的错误措辞
关于python - 如何从 subprocess.Popen.stdout 读取所有可用数据(非阻塞)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3076542/
对于一个简单的聊天程序,我使用了一个通过 boost::python 包装的 c 库。 使用 PyQT 编写了一个简单的 GUI。接收消息是通过阻塞调用完成的lib说。对于独立刷新的 GUI,通信部分
当我创建以下内容时,我试图创建一个可以被异常终止的线程类(因为我试图让线程等待一个事件): import sys class testThread(threading.Thread): def
我正在用 Haskell 编写服务器,我想在客户端断开连接后显式关闭它们。当我调用 hClose ,线程将阻塞,直到客户端关闭其一侧的句柄。有没有办法让它在不阻塞的情况下关闭? 提前致谢! 最佳答案
这个问题已经有答案了: 已关闭12 年前。 Possible Duplicate: garbage collection Operation 我有几个相关问题。 1.JAVA垃圾收集器运行时,是否占用
我有一个 Angular 函数,它在初始 URL 中查找“列表”参数,如果找到,就会出去获取信息。否则我想获得地理位置。如果存在 URL 参数,我不想获取地理位置。我使用的术语是否正确? constr
我读了很多关于锁定数据库、表和行的文章,但我想要较低的锁定,比如只锁定“操作”,我不知道如何调用它,假设我在 php 中有函数: function update_table() { //que
在我的多线程 mfc 应用程序中,m_view->SetScrollPos 处于阻塞状态并且所有应用程序都被卡住。 View 是在另一个线程中创建的,这是这种行为的原因吗? //SetScrollPo
FreeSwitch 软件在几天内运行良好(~3 - 5 天),然后由于 FreeSwitch 被阻止,新的来电请求被接受!!正在进行的调用继续他们的 session ,他们的调用似乎没有受到影响,但
我有一组按钮,当鼠标悬停在这些按钮上时,它们会改变颜色。这些的 CSS 以这种方式运行: #navsite ul li button { height: 60px; width: 60
由于某些原因,当我调用 WSARecvFrom 时,该函数在接收到某些内容之前不会返回。 _socket = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, N
我了解一些关于 Oracle 阻塞的知识——更新如何阻塞其他更新直到事务完成,写入者如何不阻塞读取者等。 我理解悲观和乐观锁定的概念,以及有关丢失更新等典型银行教科书示例。 我也理解 JDBC 事务隔
在两个代码点之间,我是否可以判断进程是否已被内核抢占,或者更确切地说,当时是否有任何其他代码在同一处理器上运行? //Point A some_type capture = some_capture(
这是我在 Oracle 的面试问题。 有一个堆栈,即使堆栈已满,push 操作也应该等到它完成,即使堆栈为空,pop 操作也应该等到它完成。 我们怎样才能做到这一点? 我的回答 让一个线程做push
我想知道是否有人可以告诉我如何有效地使用循环平铺/循环阻塞进行大型密集矩阵乘法。我正在用 1000x1000 矩阵做C = AB。我按照 Wikipedia 上的循环平铺示例进行操作,但使用平铺得到的
我正在阅读有关绿色线程的内容,并且能够理解这些线程是由 VM 或在运行时创建的,而不是由操作系统创建的,但我无法理解以下语句 When a green thread executes a blocki
我正在创建的 JavaScript API 具有以下结构: var engine = new Engine({ engineName: "TestEngine", engineHost
ChildWindow 是一个模态窗口,但它不会阻塞。有没有办法让它阻塞?我基本上想要一个 ShowDialog() 方法,该方法将调用 ChildWindow.Show() 但在用户关闭 Child
我需要一些关于如何调试 10.6 版本下的 Cocoa 并发问题的指导。我正在将“for”循环转换为使用 NSOperations,但大多数时候,代码只是在循环的某个时刻卡住。我可以在控制台中看到 N
我正在使用 ReportViewer 控件和自定义打印作业工作流程,这给我带来了一些问题。我的代码看起来有点像这样: ids.ForEach(delegate(Guid? guid)
我有以下成功复制文件的代码。但是,它有两个问题: progressBar.setValue() 之后的 System.out.println() 不会打印 0 到 100 之间的间隔(仅打印“0”直到
我是一名优秀的程序员,十分优秀!