gpt4 book ai didi

io - 相当于 Python 的 subprocess.communicate 在 Rust 中?

转载 作者:行者123 更新时间:2023-11-29 08:01:24 25 4
gpt4 key购买 nike

我正在尝试将这个向辅助进程发送和接收输入的 Python 脚本移植到 Rust:

import subprocess
data = chr(0x3f) * 1024 * 4096
child = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
output, _ = child.communicate(data)
assert output == data

在输入缓冲区超过 64k 之前,我的尝试工作正常,因为大概操作系统的管道缓冲区在输入被写入之前就已经填满了。

use std::io::Write;

const DATA: [u8; 1024 * 4096] = [0x3f; 1024 * 4096];

fn main() {
let mut child = std::process::Command::new("cat")
.stdout(std::process::Stdio::piped())
.stdin(std::process::Stdio::piped())
.spawn()
.unwrap();
match child.stdin {
Some(ref mut stdin) => {
match stdin.write_all(&DATA[..]) {
Ok(_size) => {}
Err(err) => panic!(err),
}
}
None => unreachable!(),
}
let res = child.wait_with_output();
assert_eq!(res.unwrap().stdout.len(), DATA.len())
}

Rust 中是否有等效的 subprocess.communicate?也许是一个 select 等价物?可以用mio来解决这个问题吗?另外,似乎没有办法关闭标准输入。

这里的目标是构建一个高性能系统,所以我想避免为每个任务生成一个线程。

最佳答案

好吧,完成这项工作需要大量代码,我需要 mio 和 nix 的组合,因为当它们是管道时,mio 不会将 AsRawFd 项目设置为非阻塞,所以必须这样做首先。

这是结果

extern crate mio;
extern crate bytes;

use mio::*;
use std::io;
use mio::unix::{PipeReader, PipeWriter};
use std::process::{Command, Stdio};
use std::os::unix::io::AsRawFd;
use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};
extern crate nix;

struct SubprocessClient {
stdin: PipeWriter,
stdout: PipeReader,
output : Vec<u8>,
input : Vec<u8>,
input_offset : usize,
buf : [u8; 65536],
}


// Sends a message and expects to receive the same exact message, one at a time
impl SubprocessClient {
fn new(stdin: PipeWriter, stdout : PipeReader, data : &[u8]) -> SubprocessClient {
SubprocessClient {
stdin: stdin,
stdout: stdout,
output : Vec::<u8>::new(),
buf : [0; 65536],
input : data.to_vec(),
input_offset : 0,
}
}

fn readable(&mut self, _event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket readable");


match self.stdout.try_read(&mut self.buf[..]) {
Ok(None) => {
println!("CLIENT : spurious read wakeup");
}
Ok(Some(r)) => {
println!("CLIENT : We read {} bytes!", r);
self.output.extend(&self.buf[0..r]);
}
Err(e) => {
return Err(e);
}
};
return Ok(());
}

fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket writable");

match self.stdin.try_write(&(&self.input)[self.input_offset..]) {
Ok(None) => {
println!("client flushing buf; WOULDBLOCK");
}
Ok(Some(r)) => {
println!("CLIENT : we wrote {} bytes!", r);
self.input_offset += r;
}
Err(e) => println!("not implemented; client err={:?}", e)
}
if self.input_offset == self.input.len() {
event_loop.shutdown();
}
return Ok(());
}

}

impl Handler for SubprocessClient {
type Timeout = usize;
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
events: EventSet) {
println!("ready {:?} {:?}", token, events);
if events.is_readable() {
let _x = self.readable(event_loop);
}
if events.is_writable() {
let _y = self.writable(event_loop);
}
}
}



pub fn from_nix_error(err: ::nix::Error) -> io::Error {
io::Error::from_raw_os_error(err.errno() as i32)
}

fn set_nonblock(s: &AsRawFd) -> io::Result<()> {
fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(from_nix_error)
.map(|_| ())
}


const TEST_DATA : [u8; 1024 * 4096] = [40; 1024 * 4096];
pub fn echo_server() {
let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
let process =
Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn().unwrap();
let raw_stdin_fd;
match process.stdin {
None => unreachable!(),
Some(ref item) => {
let err = set_nonblock(item);
match err {
Ok(()) => {},
Err(e) => panic!(e),
}
raw_stdin_fd = item.as_raw_fd();
},
}
let raw_stdout_fd;
match process.stdout {
None => unreachable!(),
Some(ref item) => {
let err = set_nonblock(item);
match err {
Ok(()) => {},
Err(e) => panic!(e),
}
raw_stdout_fd = item.as_raw_fd();},
}
//println!("listen for connections {:?} {:?}", , process.stdout.unwrap().as_raw_fd());
let mut subprocess = SubprocessClient::new(PipeWriter::from(Io::from_raw_fd(raw_stdin_fd)),
PipeReader::from(Io::from_raw_fd(raw_stdout_fd)),
&TEST_DATA[..]);
let stdout_token : Token = Token(0);
let stdin_token : Token = Token(1);
event_loop.register(&subprocess.stdout, stdout_token, EventSet::readable(),
PollOpt::level()).unwrap();

// Connect to the server
event_loop.register(&subprocess.stdin, stdin_token, EventSet::writable(),
PollOpt::level()).unwrap();

// Start the event loop
event_loop.run(&mut subprocess).unwrap();
let res = process.wait_with_output();
match res {
Err(e) => {panic!(e);},
Ok(output) => {
subprocess.output.extend(&output.stdout);
println!("Final output was {:}\n", output.stdout.len());
},
}
println!("{:?}\n", subprocess.output.len());
}

fn main() {
echo_server();
}

基本上,关闭标准输入的唯一方法是调用 process.wait_with_output,因为标准输入没有关闭原语

一旦发生这种情况,剩余的输入可以扩展输出数据向量。

现在有一个箱子可以做到这一点

https://crates.io/crates/subprocess-communicate

关于io - 相当于 Python 的 subprocess.communicate 在 Rust 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37109336/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com