如何在Rust中处理队列中的异步作业?

rqqzpn5f  于 2023-08-05  发布在  其他
关注(0)|答案(1)|浏览(87)

嘿,我有一个异步进程,它是资源耗尽的。我必须将它暴露给一个API,如何在后台和队列中一个接一个地处理作业。我使用了tokio::spawn,但所有派生的任务最终都同时运行。
我将在下面附上一个简单的可复制代码以供参考。

use axum::{extract::Path, routing::get, Router};

use tokio::time::Duration;

extern crate diesel;
extern crate tracing;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let app = Router::new().route("/sleep/:id", get(sleep_and_print));
    let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 3000));
    tracing::info!("Listening on {}", addr);

    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn sleep_and_print(Path(timer): Path<i32>) -> String {
    tokio::spawn(async move {
        start_timer_send_json(timer).await;
    });
    format!("{{\"timer\": {}}}", timer)
}

async fn start_timer_send_json(timer: i32) {
    println!("Start timer {}.", timer);

    tokio::time::sleep(Duration::from_secs(300)).await;

    println!("Timer {} done.", timer);
}

字符串

lxkprmvk

lxkprmvk1#

只需spawn一个任务来进行处理并使用通道与之通信:

use axum::{extract::Path, routing::get, Router};

use tokio::time::Duration;
use tokio::sync::{mpsc, oneshot}

extern crate diesel;
extern crate tracing;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    // Create a channel to send requests to the processing task
    let (tx, rx) = tokio::sync::channel();
    // Spawn a task to do all the processing. Since this is a single
    // task, all processing will be done sequentially.
    tokio::spawn (async move { process (rx).await; });

    // Pass the tx channel along to the GET handler so that it can
    // send requests to the processing task
    let app = Router::new().route("/sleep/:id", get(move |path| {
        sleep_and_print (path, &tx);
    }));
    let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 3000));
    tracing::info!("Listening on {}", addr);

    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn process (rx: mpsc::Receiver<(i32, oneshot::Sender<String>)>) 
{
    // Receive the next queued request
    while let Ok ((timer, tx)) = rx.recv().await {
        // Process the request
        start_timer_send_json(timer).await;
        // Send back the result
        if let Err (e) = tx.send (format!("{{\"timer\": {}}}", timer)) {
            println!("{:?}", e);
        }
    }
}

async fn sleep_and_print(
    Path(timer): Path<i32>, 
    tx: &mpsc::Sender<(i32, oneshot::Sender<String>)>) -> String 
{
    // Create a channel to get the result
    let (otx, orx) = oneshot::new();
    // Send our request to the processing task
    tx.send ((timer, otx)).unwrap();
    // Wait for the processing result
    orx.await.unwrap()
}

async fn start_timer_send_json(timer: i32) {
    println!("Start timer {}.", timer);

    tokio::time::sleep(Duration::from_secs(300)).await;

    println!("Timer {} done.", timer);
}

字符串

相关问题