gpt4 book ai didi

process - 如何在不阻塞Rust的情况下读取子进程的输出?

转载 作者:行者123 更新时间:2023-12-03 11:39:59 27 4
gpt4 key购买 nike

我正在Rust中制作一个小的ncurses应用程序,该应用程序需要与子进程进行通信。我已经有一个用Common Lisp编写的原型(prototype)。我试图重写它,因为CL为这么小的工具使用了大量的内存。

我在弄清楚如何与子流程进行交互时遇到了一些麻烦。

我目前正在做的大致是这样的:

  • 创建过程:
    let mut program = match Command::new(command)
    .args(arguments)
    .stdin(Stdio::piped())
    .stdout(Stdio::piped())
    .stderr(Stdio::piped())
    .spawn()
    {
    Ok(child) => child,
    Err(_) => {
    println!("Cannot run program '{}'.", command);
    return;
    }
    };
  • 将其传递到无限循环(直到用户退出),该循环读取和处理输入,并像这样监听输出(并将其写入屏幕):
    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
    match program.stdout {
    Some(ref mut out) => {
    let mut buf_string = String::new();
    match out.read_to_string(&mut buf_string) {
    Ok(_) => output_viewer.append_string(buf_string),
    Err(_) => return,
    };
    }
    None => return,
    };
    }

  • 但是,对 read_to_string的调用将阻止程序,直到进程退出。从我可以看到的 read_to_endread似乎也阻止。如果我尝试运行立即退出的类似于 ls的东西,它可以工作,但是对于没有退出的诸如 pythonsbcl的东西,它只有在我手动杀死该子进程后才继续。

    基于 this answer,我将代码更改为使用 BufReader:
        fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
    match program.stdout.as_mut() {
    Some(out) => {
    let buf_reader = BufReader::new(out);
    for line in buf_reader.lines() {
    match line {
    Ok(l) => {
    output_viewer.append_string(l);
    }
    Err(_) => return,
    };
    }
    }
    None => return,
    }
    }

    但是,问题仍然存在。它将读取所有可用的行,然后进行阻止。由于该工具应该与任何程序一起使用,因此在尝试读取之前,无法猜测输出何时结束。似乎也没有办法为 BufReader设置超时。

    最佳答案

    默认情况下,流被阻止。 TCP/IP流,文件系统流,管道流都被阻塞。当您告诉流为您提供大量字节时,它将停止并等待,直到它具有给定的字节数或发生其他事情为止(interrupt,流的末尾,错误)。
    操作系统渴望将数据返回到读取过程中,因此,如果您只想等待下一行并尽快处理它,那么Shepmaster在Unable to pipe to or from spawned child process more than once中建议的方法(以及他在此处的回答) )的作品。
    尽管从理论上讲它不必工作,因为允许操作系统使BufReader等待read中的更多数据,但实际上,操作系统更喜欢早期的“短读”而不是等待。
    当您需要处理多个流(例如子进程的BufReaderstdout)或多个进程时,这种基于stderr的简单方法会变得更加危险。例如,当子进程等待您耗尽其BufReader管道,而您的进程因等待其空stderr而被阻塞时,基于stdout的方法可能会死锁。
    同样,当您不希望程序无限期地等待子进程时,也不能使用BufReader。也许您想在 child 仍在工作时显示进度条或计时器,却没有任何输出。
    如果您的操作系统恰好不急于将数据返回给进程(首选“完全读取”而不是“短读取”),则不能使用基于BufReader的方法,因为在这种情况下,子进程会打印最后几行可能会以灰色区域结尾:操作系统将其捕获,但是它们的大小不足以填充BufReader的缓冲区。BufReader限于Read接口(interface)允许它对流执行的操作,它的阻止程度不亚于基础流。为了提高效率,它将以块的形式read输入,告诉操作系统填充尽可能多的可用缓冲区。
    您可能想知道为什么在这里块读取数据如此重要,为什么BufReader不能仅逐字节读取数据。问题是要从流中读取数据,我们需要操作系统的帮助。另一方面,我们不是操作系统,我们与它隔离工作,以免在我们的过程出现问题时避免困惑。因此,为了调用操作系统,需要过渡到“内核模式”,这也可能会导致“上下文切换”。这就是为什么调用操作系统读取每个字节很昂贵的原因。我们希望尽可能少的OS调用,因此我们可以分批获取流数据。
    要在不阻塞的情况下等待流,您需要一个非阻塞流。 MIO promises to have the required non-blocking stream support for pipes,很可能是PipeReader,但到目前为止我还没有检查出来。
    流的非阻塞性质应该使得可以分块读取数据,而不管操作系统是否喜欢“短读取”。因为非阻塞流永远不会阻塞。如果流中没有数据,它只会告诉您。
    在没有非阻塞流的情况下,您将不得不诉诸于生成线程,以便阻塞读取将在单独的线程中执行,因此不会阻塞您的主线程。您可能还想逐字节读取流,以便在操作系统不喜欢“短读取”的情况下立即对行分隔符使用react。这是一个工作示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78
    P.S.这是一个函数示例,该函数允许通过共享的字节向量监视程序的标准输出:

    use std::io::Read;
    use std::process::{Command, Stdio};
    use std::sync::{Arc, Mutex};
    use std::thread;

    /// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
    fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
    where
    R: Read + Send + 'static,
    {
    let out = Arc::new(Mutex::new(Vec::new()));
    let vec = out.clone();
    thread::Builder::new()
    .name("child_stream_to_vec".into())
    .spawn(move || loop {
    let mut buf = [0];
    match stream.read(&mut buf) {
    Err(err) => {
    println!("{}] Error reading from stream: {}", line!(), err);
    break;
    }
    Ok(got) => {
    if got == 0 {
    break;
    } else if got == 1 {
    vec.lock().expect("!lock").push(buf[0])
    } else {
    println!("{}] Unexpected number of bytes: {}", line!(), got);
    break;
    }
    }
    }
    })
    .expect("!thread");
    out
    }

    fn main() {
    let mut cat = Command::new("cat")
    .stdin(Stdio::piped())
    .stdout(Stdio::piped())
    .stderr(Stdio::piped())
    .spawn()
    .expect("!cat");

    let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
    let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
    let mut stdin = match cat.stdin.take() {
    Some(stdin) => stdin,
    None => panic!("!stdin"),
    };
    }
    通过几个帮助程序,我正在使用它来控制SSH session :
    try_s! (stdin.write_all (b"echo hello world\n"));
    try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));
    P.S.请注意,async-std中的 read调用上的 await也处于阻塞状态。它只是阻塞一个 future 链(本质上是一个无堆栈的绿色线程),而不是阻塞系统线程。 poll_read是非阻塞接口(interface)。在 async-std#499中,我问开发人员这些API是否有简短的读取保证。

    关于process - 如何在不阻塞Rust的情况下读取子进程的输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59374194/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com