如何在Rust中读取子进程的输出而不阻塞?

wbrvyc0a  于 11个月前  发布在  其他
关注(0)|答案(5)|浏览(126)

我正在用Rust做一个小的ncurses应用程序,它需要与一个子进程通信。我已经有了一个用Common Lisp编写的原型。我正在尝试重写它,因为CL对这样一个小工具使用了大量的内存。
我在弄清楚如何与子进程交互时遇到了一些麻烦。
我目前正在做的事情大致是这样的:
1.创建流程:

let mut program = match Command::new(command)
    .args(arguments)
    .stdin(Stdio::piped())
    .stdout(Stdio::piped())
    .stderr(Stdio::piped())
    .spawn()
{
    Ok(child) => child,
    Err(_) => {
        println!("Cannot run program '{}'.", command);
        return;
    }
};

字符串
1.将其传递给一个无限循环(直到用户退出),该循环读取和处理输入,并像这样侦听输出(并将其写入屏幕):

fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
    match program.stdout {
        Some(ref mut out) => {
            let mut buf_string = String::new();
            match out.read_to_string(&mut buf_string) {
                Ok(_) => output_viewer.append_string(buf_string),
                Err(_) => return,
            };
        }
        None => return,
    };
}


但是对read_to_string的调用会阻塞程序,直到进程退出。从我所看到的read_to_endread似乎也会阻塞。如果我尝试运行像ls这样立即退出的程序,它会工作,但是对于像pythonsbcl这样不退出的程序,它只会在我手动杀死子进程后继续运行。
基于this answer,我将代码改为使用BufReader

fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
        match program.stdout.as_mut() {
            Some(out) => {
                let buf_reader = BufReader::new(out);
                for line in buf_reader.lines() {
                    match line {
                        Ok(l) => {
                            output_viewer.append_string(l);
                        }
                        Err(_) => return,
                    };
                }
            }
            None => return,
        }
    }


然而,问题仍然是一样的。它将读取所有可用的行,然后阻塞。由于该工具应该与任何程序一起工作,因此在尝试读取之前,没有办法猜测输出何时结束。似乎也没有办法为BufReader设置超时。

kx1ctssn

kx1ctssn1#

流默认是阻塞的。TCP/IP流,文件系统流,管道流,它们都是阻塞的。当你告诉一个流给予你一个字节块时,它会停止并等待,直到它有给定的字节量或其他事情发生(interrupt,流结束,错误)。
操作系统渴望将数据返回到阅读进程,所以如果你想要的只是等待下一行,并在它到来时立即处理它,那么Shepmaster在Unable to pipe to or from spawned child process more than once(以及他在这里的回答)中建议的方法就可以工作了。
虽然在理论上它不需要工作,因为操作系统允许BufReader等待read中的更多数据,但实际上操作系统更喜欢早期的“短读”而不是等待。
当您需要处理多个流(如子进程的stdoutstderr)或多个进程时,这种简单的基于BufReader的方法变得更加危险。例如,当子进程等待您耗尽其stderr管道时,而您的进程因等待其空stdout而阻塞时,基于BufReader的方法可能会死锁。
类似地,当你不想让你的程序无限期地等待子进程时,你也不能使用BufReader。也许你想在子进程仍在工作时显示一个进度条或计时器,而不给你任何输出。
如果你的操作系统碰巧不急于将数据返回给进程(更喜欢“完整读取”而不是“短读取”),你就不能使用基于BufReader的方法,因为在这种情况下,子进程打印的最后几行可能会出现在灰色区域:操作系统得到了它们,但它们不够大,无法填满BufReader的缓冲区。
BufReader受限于Read接口允许它对流做的事情,它的阻塞程度并不比底层流少。为了提高效率,它将分块读取输入,告诉操作系统尽可能多地填充其可用的缓冲区。
你可能想知道为什么阅读数据在这里如此重要,为什么BufReader不能一个字节一个字节地读取数据。问题是,从流中读取数据需要操作系统的帮助。另一方面,我们不是操作系统,我们与它隔离工作,所以为了调用操作系统,需要转换到“内核模式”,这也可能导致“上下文切换”。这就是为什么调用操作系统来读取每个字节是昂贵的。我们希望尽可能少的操作系统调用,所以我们批量获取流数据。
要在不阻塞的情况下等待流,你需要一个 * 非阻塞流 *. MIO promises to have the required non-blocking stream support for pipes,最有可能是PipeReader,但我还没有检查过。
流的非阻塞特性应该使它能够以块的形式读取数据,而不管操作系统是否喜欢“短读”。因为非阻塞流永远不会阻塞。如果流中没有数据,它会简单地告诉你。
在没有非阻塞流的情况下,你必须求助于派生线程,这样阻塞读取将在一个单独的线程中执行,从而不会阻塞你的主线程。你可能还想逐字节地读取流,以便在操作系统不喜欢“短读取”的情况下立即对行分隔符做出React。下面是一个工作示例:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78
P.S.这里有一个函数的例子,它允许通过共享的字节向量来监视程序的标准输出:

use std::io::Read;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;

/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(mut stream: R) -> Arc<Mutex<Vec<u8>>>
where
    R: Read + Send + 'static,
{
    let out = Arc::new(Mutex::new(Vec::new()));
    let vec = out.clone();
    thread::Builder::new()
        .name("child_stream_to_vec".into())
        .spawn(move || loop {
            let mut buf = [0];
            match stream.read(&mut buf) {
                Err(err) => {
                    println!("{}] Error reading from stream: {}", line!(), err);
                    break;
                }
                Ok(got) => {
                    if got == 0 {
                        break;
                    } else if got == 1 {
                        vec.lock().expect("!lock").push(buf[0])
                    } else {
                        println!("{}] Unexpected number of bytes: {}", line!(), got);
                        break;
                    }
                }
            }
        })
        .expect("!thread");
    out
}

fn main() {
    let mut cat = Command::new("cat")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("!cat");

    let out = child_stream_to_vec(cat.stdout.take().expect("!stdout"));
    let err = child_stream_to_vec(cat.stderr.take().expect("!stderr"));
    let mut stdin = match cat.stdin.take() {
        Some(stdin) => stdin,
        None => panic!("!stdin"),
    };
}

字符串
通过几个助手,我使用它来控制SSH会话:

try_s! (stdin.write_all (b"echo hello world\n"));
try_s! (wait_forˢ (&out, 0.1, 9., |s| s == "hello world\n"));


P.S.请注意,awaitawait中的读调用也是阻塞的。它只是阻止了一个系统线程,而不是阻止一个未来的链(本质上是一个无堆栈的绿色线程)。poll_read是非阻塞接口。在await中,我问过开发人员这些API是否有短读保证。
P.S.在Nom中可能有类似的问题:“* 我们希望告诉IO端根据解析器的结果(不完整或不完整)重新填充 *”
P.S.看看流阅读是如何在crossterm中实现的可能会很有趣。对于Windows,在poll.rs中,他们使用的是原生的WaitForMultipleObjects。在unix.rs中,他们使用的是mio poll

4smxwvx5

4smxwvx52#

时雄的Command

下面是使用tokio 0.2的示例:

use std::process::Stdio;
use futures::StreamExt; // 0.3.1
use tokio::{io::BufReader, prelude::*, process::Command}; // 0.2.4, features = ["full"]

#[tokio::main]
async fn main() {
    let mut cmd = Command::new("/tmp/slow.bash")
        .stdout(Stdio::piped()) // Can do the same for stderr
        .spawn()
        .expect("cannot spawn");

    let stdout = cmd.stdout().take().expect("no stdout");
    // Can do the same for stderr

    // To print out each line
    // BufReader::new(stdout)
    //     .lines()
    //     .for_each(|s| async move { println!("> {:?}", s) })
    //     .await;

    // To print out each line *and* collect it all into a Vec
    let result: Vec<_> = BufReader::new(stdout)
        .lines()
        .inspect(|s| println!("> {:?}", s))
        .collect()
        .await;

    println!("All the lines: {:?}", result);
}

字符串

Tokio-Threadpool

下面是一个使用tokio 0.1和tokio-threadpool的例子。我们使用blocking函数在线程中启动进程。我们使用stream::poll_fn将其转换为流

use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13

fn stream_command_output(
    mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
    // Ensure that the output is available to read from and start the process
    let mut child = command
        .stdout(Stdio::piped())
        .spawn()
        .expect("cannot spawn");
    let mut stdout = child.stdout.take().expect("no stdout");

    // Create a stream of data
    stream::poll_fn(move || {
        // Perform blocking IO
        tokio_threadpool::blocking(|| {
            // Allocate some space to store anything read
            let mut data = vec![0; 128];
            // Read 1-128 bytes of data
            let n_bytes_read = stdout.read(&mut data).expect("cannot read");

            if n_bytes_read == 0 {
                // Stdout is done
                None
            } else {
                // Only return as many bytes as we read
                data.truncate(n_bytes_read);
                Some(data)
            }
        })
    })
}

fn main() {
    let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));

    let mut runtime = Runtime::new().expect("Unable to start the runtime");

    let result = runtime.block_on({
        output_stream
            .map(|d| String::from_utf8(d).expect("Not UTF-8"))
            .fold(Vec::new(), |mut v, s| {
                print!("> {}", s);
                v.push(s);
                Ok(v)
            })
    });

    println!("All the lines: {:?}", result);
}


这里有许多可能的折衷,例如,总是分配128字节并不理想,但它很容易实现。

技术支持

作为参考,下面是slow.bash

#!/usr/bin/env bash

set -eu

val=0

while [[ $val -lt 10 ]]; do
    echo $val
    val=$(($val + 1))
    sleep 1
done


另请参阅:

csbfibhn

csbfibhn3#

如果Unix支持足够,您还可以使两个输出流成为非阻塞的,并像在set_nonblocking上使用set_nonblocking函数一样轮询它们。
命令生成返回的ChildStdoutChildStderrStdio(包含文件描述符),您可以直接修改这些句柄的读取行为,使其成为非阻塞的。
基于jcreekmore/timeout-readwrite-rsanowell/nonblock-rs的工作,我使用这个 Package 器来修改流句柄:

extern crate libc;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK};

fn set_nonblocking<H>(handle: &H, nonblocking: bool) -> std::io::Result<()>
where
    H: Read + AsRawFd,
{
    let fd = handle.as_raw_fd();
    let flags = unsafe { fcntl(fd, F_GETFL, 0) };
    if flags < 0 {
        return Err(std::io::Error::last_os_error());
    }
    let flags = if nonblocking{
        flags | O_NONBLOCK
    } else {
        flags & !O_NONBLOCK
    };
    let res = unsafe { fcntl(fd, F_SETFL, flags) };
    if res != 0 {
        return Err(std::io::Error::last_os_error());
    }
    Ok(())
}

字符串
您可以像管理其他任何非阻塞流一样管理这两个流。下面的示例基于polling crate,它可以很容易地处理read事件,而BufReader用于行阅读:

use std::process::{Command, Stdio};
use std::path::PathBuf;
use std::io::{BufReader, BufRead};
use std::thread;
extern crate polling;
use polling::{Event, Poller};

fn main() -> Result<(), std::io::Error> {
    let path = PathBuf::from("./worker.sh").canonicalize()?;

    let mut child = Command::new(path)
        .stdin(Stdio::null())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .expect("Failed to start worker");

    let handle = thread::spawn({
        let stdout = child.stdout.take().unwrap();
        set_nonblocking(&stdout, true)?;
        let mut reader_out = BufReader::new(stdout);

        let stderr = child.stderr.take().unwrap();
        set_nonblocking(&stderr, true)?;
        let mut reader_err = BufReader::new(stderr);

        move || {
            let key_out = 1;
            let key_err = 2;
            let mut out_closed = false;
            let mut err_closed = false;

            let poller = Poller::new().unwrap();
            poller.add(reader_out.get_ref(), Event::readable(key_out)).unwrap();
            poller.add(reader_err.get_ref(), Event::readable(key_err)).unwrap();

            let mut line = String::new();
            let mut events = Vec::new();
            loop {
                // Wait for at least one I/O event.
                events.clear();
                poller.wait(&mut events, None).unwrap();

                for ev in &events {
                    // stdout is ready for reading
                    if ev.key == key_out {
                        let len = match reader_out.read_line(&mut line) {
                            Ok(len) => len,
                            Err(e) => {
                                println!("stdout read returned error: {}", e);
                                0
                            }
                        };
                        if len == 0 {
                            println!("stdout closed (len is null)");
                            out_closed = true;
                            poller.delete(reader_out.get_ref()).unwrap();
                        } else {
                            print!("[STDOUT] {}", line);
                            line.clear();
                            // reload the poller
                            poller.modify(reader_out.get_ref(), Event::readable(key_out)).unwrap();
                        }
                    }

                    // stderr is ready for reading
                    if ev.key == key_err {
                        let len = match reader_err.read_line(&mut line) {
                            Ok(len) => len,
                            Err(e) => {
                                println!("stderr read returned error: {}", e);
                                0
                            }
                        };
                        if len == 0 {
                            println!("stderr closed (len is null)");
                            err_closed = true;
                            poller.delete(reader_err.get_ref()).unwrap();
                        } else {
                            print!("[STDERR] {}", line);
                            line.clear();
                            // reload the poller
                            poller.modify(reader_err.get_ref(), Event::readable(key_err)).unwrap();
                        }
                    }
                }

                if out_closed && err_closed {
                    println!("Stream closed, exiting process thread");
                    break;
                }
            }
        }
    });

    handle.join().unwrap();
    Ok(())
}


此外,使用EventFd上的 Package 器,可以轻松地停止另一个线程的进程,而无需阻塞或主动轮询,并且仅使用单个线程。

**EDIT:**我测试了一下,好像polling crate会自动在非阻塞模式下设置polled句柄。如果你想直接使用nix::poll对象,set_nonblocking函数还是很有用的。

xxb16uws

xxb16uws4#

我遇到过足够多的用例,在这些用例中,通过行分隔的文本与子进程进行交互是很有用的,我为此编写了一个crate,interactive_process
我想原来的问题早就解决了,但我想这可能对别人有帮助。

mkshixfv

mkshixfv5#

我在写GUI的时候也遇到了同样的问题。为了解决这个问题,我使用了一个线程来接收子进程的输出。这种方法可以防止阻塞主线程。但是这种方法需要子进程有一个退出命令或者自己终止。

fn main() {
    let mut stdin = comminucate("../mybin".to_string());
    stdin.write_all("some commands".as_bytes()).unwrap();
    stdin.write_all("quit".as_bytes()).unwrap();
}

pub fn comminucate(path: String) -> ChildStdin {
    let mut child = Command::new(path)
                    .stdin(Stdio::piped())
                    .stdout(Stdio::piped())
                    .spawn()
                    .unwrap();
    
    let stdin = child.stdin.take().unwrap();
    let stdout = child.stdout.take().unwrap();
    
    thread::spawn(move || {
        let reader = BufReader::new(stdout);
        /* it waits for new output */
        for line in reader.lines() {
            let output = line.unwrap();
            if output == "something1" {  /* do something */  }
            if output == "something2" {  /* do something */  }
        }
    });
    return stdin;
}

字符串

相关问题