rust 从另一个异步任务访问异步任务句柄

ecfsfe2w  于 2023-06-06  发布在  其他
关注(0)|答案(1)|浏览(158)

我正在处理一个产生多个异步任务的逻辑,每个异步任务都无限循环地做自己的事情。我想实现一个监视这些任务的任务。我正在做的方法如下:将每个衍生任务的JoinHandle推送到HashMap中,其中键是特定的唯一字符串,值是JoinHandle。然后在一个单独的任务中,我将循环并监视这些任务,通过调用is_finished()检查它们是否正在运行。
我面临的问题是,我不能将HashMap句柄传递给任务,而该任务将在不克隆它的情况下检查它们的状态。这是一个问题,因为(由于我有限的知识)这将只是在引擎盖下产生这些任务的副本,本质上我不会引用原始任务。
我决定尝试使用Arc智能指针,尽管我不确定我的方法是否正确。下面是一些代码:

type TasksMap = HashMap<String, Arc<JoinHandle<()>>>;

pub async fn monitor_each(db: &DbClient) -> Result<()> {
    let orderbooks = db::get_orderbooks().await?;
    
    for (address, _) in orderbooks {
        // do some stuff

        let ob_monitor_task = orderbook.clone().monitor();
        
        // Add the OBs to a map of tasks, so we can monitor if they're running
        tasks
            .entry(orderbook.addr.to_string())
            .or_insert(ob_monitor_task);
    }
    Ok(())
}

/// Periodically check status of tasks
async fn check_tasks_status(tasks: &TasksMap) -> Result<()> {
    tokio::spawn(async move {
        loop {
            for (ob_address, task) in tasks.iter() {
                let og_task = Arc::get_mut(task).unwrap(); // getting compile Error here
                if og_task.is_finished() {
                    error!("Monitor task not running for OB with address {ob_address}");
                }
            }
            tokio::time::sleep(Duration::from_secs(10)).await;
        }
    })
    .await?
}

impl Orderbook {
    /// Starts a task that monitors periodically the orderbook.
    pub fn monitor(&mut self) -> Arc<JoinHandle<()>> {
        let handle = tokio::spawn(async move {
            loop {
                // do some stuff

                tokio::time::sleep(Duration::from_secs(5)).await;
            }
        });

        Arc::new(handle)
    }
}

因此,我在let og_task = Arc::get_mut(task).unwrap();处得到一个编译时错误:

Note: expected mutable reference `&mut Arc<_>` found reference `&Arc<tokio::task::JoinHandle<()>>`

我尝试过用不同的方法将task变量定义为mut,但我不确定我是否完全理解了发生了什么。
另外,我能确定从Arc得到的og_task句柄是原始句柄吗?

最小可重复示例here

vmdwslir

vmdwslir1#

我认为您需要先使用Arc::clone(task)来获得一个“owned”值,然后您可以自由地改变它。就像

for (ob_address, task) in tasks.iter() {
                let mut task = Arc::clone(task);
                let og_task = Arc::get_mut(&mut task).unwrap();
                ...
            }

相关问题