rust 我如何`fut.await`在“后台”运行一个future(执行它但不等待它)?

1sbrub3j  于 2023-06-30  发布在  其他
关注(0)|答案(2)|浏览(138)

我希望能够启动一个在后台运行的future,而不是在父函数作用域中立即等待它。
类似于一个动态的join_all,我可以在一个循环中添加新的期货到一个集合中,然后将集合传递给另一个函数,该函数可以等待整个集合(已经在运行)。
我希望能够做这样的事情:

join_all(vec![
    log_arg(&c),
    log_arg(&c)
]).await;

但问题是:

  • .await启动未来的执行,但也在当前函数等待它。
  • 如何在不等待的情况下开始执行?
  • &c不是'static
  • 这似乎是所有时雄API的一个要求,即“启动未来的执行,而不等待当前fn范围内的结果”,例如spawn_local
  • 如果所有的期货都在一根线上,那就没问题了。

示例:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=85aa3a517bd1906b285f5a5586d7fa6d

q3qa4bjr

q3qa4bjr1#

如何在不等待的情况下开始执行?
spawn任务。
这似乎是所有时雄API的一个要求,即“启动未来的执行,而不等待当前fn范围内的结果”,例如spawn_local
是的,因为你正在生成一个任务,所以这个任务有可能比拥有这个项目的任何东西都存在,从而导致一个悬空引用,这是不允许的。事实上,在使用spawn_local时,这几乎是一个保证:它将在同一个线程(/scheduler)上产生一个任务,并且在当前任务退出或终止之前,该任务根本无法运行。
另一种方法是使用“范围任务”(不必立即等待,但最终必须加入)。然而,到目前为止,时雄对结构化并发(有作用域的任务)的支持已经胎死腹中。因此Rust编译器无法知道一个任务没有从初始化它的作用域中“逃逸”,因此它必须假设它确实如此,因此任务捕获的任何内容都应该能够超过当前作用域。

pepwfjgg

pepwfjgg2#

这是wg-async scope proposal,这是tokio scoped tasks issue
当我遇到类似的问题时,我首先研究了async-scopedcrossbeam::scope的异步版本,或rayon::scope)。最后,我决定写一个新的类型,它为每个未来产生一个任务(或者,在我的例子中是Stream<Item=String>),当它被删除时,它确保任务完成。这本质上是“范围”思想的穷人实现。

// how long to wait to print remaining container logs
const DRAIN_LOGS_TIMEOUT: Duration = Duration::from_secs(5);

pub(crate) struct LogPrinter {
    tasks: Vec<JoinHandle<()>>,
}

impl Drop for LogPrinter {
    fn drop(&mut self) {
        for task in self.tasks.drain(..).rev() {
            println!("Finalizing LogPrinter");
            let result = futures::executor::block_on(
                tokio::time::timeout(DRAIN_LOGS_TIMEOUT, task));
            println!("Finalized LogPrinter {:?}", result);
        }
    }
}

impl LogPrinter {
    pub(crate) fn new() -> Self {
        Self {
            tasks: vec![],
        }
    }

    pub(crate) fn print(&mut self, log_stream: Fuse<(impl Stream<Item=String> + Unpin + Sized + Send + 'static)>) {
        let mut s = log_stream;
        let handle = tokio::spawn(async move {
            while select! {
                msg = s.next() => match msg {
                    Some(msg) => {
                        let msg: String = msg;
                        println!("{}", msg);
                        true
                    },
                    None => false
                },
            } {}
        });
        self.tasks.push(handle);
    }
}

我是这样用的

let mut log_printer: LogPrinter = LogPrinter::new();
let logs_skrouterd: impl Stream<Item=String> = stream_container_logs(&docker, "skrouterd", &skrouterd);

log_printer.print(logs_skrouterd.fuse());

log_printer超出范围时,drop将被执行。

相关问题