rust 如何在等待'JoinHandle'的同时更新'JoinHandle'?

hmae6n7t  于 2023-01-26  发布在  其他
关注(0)|答案(1)|浏览(130)

是否可以同时从一组JoinHandle<()>任务中读取Future流 * 并 * 用新任务更新该组任务?
我目前有一个Service,它运行一些长任务,唯一的问题是,我实际上希望(如果可能的话)同时添加 * new * 任务--通过某种类型的Receiver通道发送的标志(为了简化起见,下面没有显示)。
考虑到Service::runhandles被该函数所拥有,我倾向于"否",这是不可能的。这是真的吗?如果这在我的设置下是不可能的,有没有什么方法可以调整下面的代码使之成为可能?
我读到in this answer,将HashMap Package 在Option中允许我在Service::run中使用.take(),因为需要拥有该值才能调用.into_values()。然而,这样做的问题是.take()消耗Mutex中的值,留下None
下面是我的最小可重复的例子(没有编译这个,但应该给出的想法):

use tokio::{sleep, time::Duration, task::JoinHandle};
use async_std::{Mutex, Arc};
use futures::{
    stream::{FuturesUnordered, StreamExt},
    Future,
};

type Handles = Arc<Mutex<Option<HashMap<String, JoinHandle<()>>>>>;

fn a_task() -> impl Future<Output = ()> {
  async move {
    sleep(Duration::from_secs(3)).await;
  }
}

fn the_update_task(handles: Handles) -> impl Future<Output = ()> {
  async move {
    // would like to update `handles` here as I get new data from a channel
    // calling .take() in Service::run nukes my handles here :(
  }
}

struct Service {
  handles: Handles,
}

impl Service {
  fn new() -> Self {
    let handles = Arc::new(Mutex::new(Some(HashMap::default())));
    let handle = tokio::spawn(the_update_task(handles.clone());
    Self { handles }
  }

  async fn add_a_task(&mut self, id: String) {
    let handle = tokio::spawn(a_task());
    self.handles.lock().await.as_mut().unwrap().insert(id, handle);
  }

  async fn run(self) {
    let Service { handles, .. } = self;
    let mut futs = FuturesUnordered::from_iter(
       handles.lock().await.take().unwrap().into_values()
    );
    while let Some(fut) = futs.next().await {
      info!("I completed a task! fut:?}");
    }
  }
}

#[tokio::main]
async fn main() {
   let mut srvc = Service::new();
   srvc.add_task("1".to_string()).await;
   srvc.add_task("2".to_string()).await;

   let handle = tokio::spawn(srv.run());
   handle.await;
}

我试过了

  • 使用Arc(Mutex(HashMap))
  • 使用Arc(Mutex(Option(HashMap)))

我似乎总是得出同样的结论:

  • 我无法同时 * 拥有Service::run中的 * handles * 和 * 从代码的其他部分更新handles(甚至是副本/引用
polhcujo

polhcujo1#

只是在@user1937198评论的帮助下回答了我自己的问题。
解决方案是用新任务直接更新对FuturesUnordered的引用,而不是关注handles,这大大简化了工作。

use tokio::{sleep, time::Duration, task::JoinHandle};
use async_std::{Mutex, Arc};
use futures::{
    stream::{FuturesUnordered, StreamExt},
    Future,
};

fn a_task() -> impl Future<Output = ()> {
  async move {
    sleep(Duration::from_secs(3)).await;
  }
}

fn the_update_task(futs: Arc<Mutex<FuturesUnordered>>) -> impl Future<Output = ()> {
  async move {
    // Just push another task
    let fut = tokio::spawn(a_task());
    futs.lock().await.push(fut);
  }
}

struct Service {
  handles: HashMap<String, JoinHandle<()>>,
}

impl Service {
  fn new() -> Self {
    let handles = HashMap::default();
    Self { handles }
  }

  async fn add_a_task(&mut self, id: String) {
    let handle = tokio::spawn(a_task());
    self.handles.insert(id, handle);
  }

  async fn run(self) {
    let Service { handles, .. } = self;
    let futs = Arc::new(Mutex::new(FuturesUnordered::from_iter(handles.into_values())));

    tokio::spawn(the_update_task(futs.clone())).await.unwrap();

    while let Some(fut) = futs.lock().await.next().await {
      info!("I completed a task! fut:?}");
    }
  }
}

#[tokio::main]
async fn main() {
   let mut srvc = Service::new();
   srvc.add_task("1".to_string()).await;
   srvc.add_task("2".to_string()).await;

   let handle = tokio::spawn(srv.run());
   handle.await;
}

相关问题