gpt4 book ai didi

python - 如何使用 asyncio 使用 SubprocessProtocol 从子进程中读取并在任意点终止该子进程?

转载 作者:太空宇宙 更新时间:2023-11-04 04:09:49 28 4
gpt4 key购买 nike

使用答案 here作为基础(使用 SubprocessProtocol),我只是尝试从子进程中读取并在我选择的点停止读取(并终止子进程)(例如,我已经读取了足够的数据).

请注意,我确实想要使用 run_until_complete 的好处 per another discussion .

我碰巧使用的是 Windows,下面的示例使用的是来自 Cygwin 的 cat。我使用的实际实用程序只是一个 native Windows 控制台应用程序 - 但它会流式传输直到手动关闭。

我可以很好地读取数据,但是我尝试停止读取并关闭子进程(例如,从 pipe_data_received() 中调用 loop.stop())导致异常( RuntimeError:事件循环已关闭ValueError:关闭管道上的 I/O 操作)。我想立即优雅地终止子进程。

我认为这与其说是平台,不如说是我不知道在哪里可以适本地打断事情以达到预期的效果。关于如何实现这一目标的任何想法?

我的 Python 3.7+ 代码(根据示例修改):

import asyncio
import os

external_program = "cat" # Something that will output to stdio
external_option = "a" # An arbitrarily large amount of data
saved_data = []

class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
data_len = len(data)
print(''.join(' {:02x}'.format(x) for x in data), flush=True)
saved_data.extend(data)
if len(saved_data) > 512: # Stop once we've read this much data
loop.call_soon_threadsafe(loop.stop)
def connection_lost(self, exc):
print("Connection lost")
loop.stop() # end loop.run_forever()

print("START")

if os.name == 'nt':
# On Windows, the ProactorEventLoop is necessary to listen on pipes
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()

try:
loop.run_until_complete(
loop.subprocess_exec(
SubprocessProtocol,
external_program,
external_option,
)
)
loop.run_forever()
finally:
loop.close()

print("DONE")
loop.close()

最佳答案

不是 asyncio 专家,但像这样的东西应该有用。

import time
import asyncio
import threading

class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, loop):
self.transport = None
self.loop = loop

def pipe_data_received(self, fd, data):
print('data received')

def connection_lost(self, exc):
print("Connection lost")

def connection_made(self, transport):
print("Connection made")

self.transport = transport

# becasue calc won't call pipe_data_received method.
t = threading.Thread(target=self._endme)
t.setDaemon(True)
t.start()

def _endme(self):
time.sleep(2)
# You'd normally use these inside pipe_data_received, connection_lost methods
self.transport.close()
self.loop.stop()


def main():
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop.run_until_complete(loop.subprocess_exec(
lambda: SubprocessProtocol(loop),
'calc.exe'
))

loop.run_forever()
loop.close()

if __name__ == "__main__":
main()

关于python - 如何使用 asyncio 使用 SubprocessProtocol 从子进程中读取并在任意点终止该子进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56523496/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com