rust 如何在单个线程上按顺序运行异步任务?

omqzjyyz  于 2023-02-04  发布在  其他
关注(0)|答案(1)|浏览(144)

我正在开发一个使用rust-tokio异步执行的程序。主函数周期性地调用一个函数来附加到CSV文件以记录操作。
我想让CSV创建功能异步运行,并将其作为一个单独的任务运行,这样,如果CSV创建需要一些时间(例如等待Excel等其他应用程序发布它),我可以继续主功能。
有没有一种优雅的方式来实现这一点?
LocalSet看起来似乎可以完成这项工作,但是任务需要按顺序执行,所以CSV是按时间顺序的。对我来说,文档似乎不能保证这一点。
这里有一些伪代码来说明这个想法。本质上,我想的是一个需要完成的任务队列。

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let local = task::LocalSetOrdered::new(); //This is a fictitious struct

    let mut data: usize = 10; //For simplicity, just store a single number
    loop {
        // Some operations here
        data = data + 1;

        let data_clone = data.clone();
        //Add a new task to complete after all prior tasks
        local.push(async move {
            match append_to_csv(data_clone).await {
                Ok(_) => Ok(()),
                Err(_) => Err(()),
            }
        });

        sleep(Duration::from_secs(60)).await;
    }

    Ok(())
}

async fn append_to_csv(data_in: usize) -> Result<(), Box<dyn Error>> {
    loop {
        let file = match OpenOptions::new().write(true).append(true).open(filename) {
            Ok(f) => f,
            Err(_) => {
                //Error opening the file, try again
                sleep(Duration::from_secs(60)).await;
                continue;
            }
        };
        let wtr = csv::Writer::from_writer(file);

        let date_time = Utc::now();
        wtr.write_field(format!("{}", date_time.format("%Y/%m/%d %H:%M")))?;
        wtr.write_field(format!("{}", data_in))?;
        wtr.write_record(None::<&[u8]>)?; //Finish the line
    }
}
ej83mcc0

ej83mcc01#

您可以使用辅助任务写入csv文件,使用通道传递要写入的数据

use tokio::sync::mpsc::{channel, Receiver};

#[derive(Debug)]
pub struct CsvData(i32, &'static str);

async fn append_to_csv(mut rx: Receiver<CsvData>) {
    let mut wtr = csv::Writer::from_writer(std::io::stdout());
    while let Some(data) = rx.recv().await {
        wtr.write_record([&data.0.to_string(), data.1]).unwrap();
        wtr.flush().unwrap();
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = channel(10);
    tokio::spawn(async {
        append_to_csv(rx).await;
    });

    for i in 0.. {
        tx.send(CsvData(i, "Hello world")).await.unwrap();
    }
}

如果需要写入来源于多个任务的数据,可以克隆通道发送方。

相关问题