是否可以同时从一组JoinHandle<()>
任务中读取Future
流 * 并 * 用新任务更新该组任务?
我目前有一个Service
,它运行一些长任务,唯一的问题是,我实际上希望(如果可能的话)同时添加 * new * 任务--通过某种类型的Receiver
通道发送的标志(为了简化起见,下面没有显示)。
考虑到Service::run
中handles
被该函数所拥有,我倾向于"否",这是不可能的。这是真的吗?如果这在我的设置下是不可能的,有没有什么方法可以调整下面的代码使之成为可能?
我读到in this answer,将HashMap
Package 在Option
中允许我在Service::run
中使用.take()
,因为需要拥有该值才能调用.into_values()
。然而,这样做的问题是.take()
消耗Mutex
中的值,留下None
。
下面是我的最小可重复的例子(没有编译这个,但应该给出的想法):
use tokio::{sleep, time::Duration, task::JoinHandle};
use async_std::{Mutex, Arc};
use futures::{
stream::{FuturesUnordered, StreamExt},
Future,
};
type Handles = Arc<Mutex<Option<HashMap<String, JoinHandle<()>>>>>;
fn a_task() -> impl Future<Output = ()> {
async move {
sleep(Duration::from_secs(3)).await;
}
}
fn the_update_task(handles: Handles) -> impl Future<Output = ()> {
async move {
// would like to update `handles` here as I get new data from a channel
// calling .take() in Service::run nukes my handles here :(
}
}
struct Service {
handles: Handles,
}
impl Service {
fn new() -> Self {
let handles = Arc::new(Mutex::new(Some(HashMap::default())));
let handle = tokio::spawn(the_update_task(handles.clone());
Self { handles }
}
async fn add_a_task(&mut self, id: String) {
let handle = tokio::spawn(a_task());
self.handles.lock().await.as_mut().unwrap().insert(id, handle);
}
async fn run(self) {
let Service { handles, .. } = self;
let mut futs = FuturesUnordered::from_iter(
handles.lock().await.take().unwrap().into_values()
);
while let Some(fut) = futs.next().await {
info!("I completed a task! fut:?}");
}
}
}
#[tokio::main]
async fn main() {
let mut srvc = Service::new();
srvc.add_task("1".to_string()).await;
srvc.add_task("2".to_string()).await;
let handle = tokio::spawn(srv.run());
handle.await;
}
我试过了
- 使用
Arc(Mutex(HashMap))
- 使用
Arc(Mutex(Option(HashMap)))
我似乎总是得出同样的结论:
- 我无法同时 * 拥有
Service::run
中的 *handles
* 和 * 从代码的其他部分更新handles
(甚至是副本/引用
1条答案
按热度按时间polhcujo1#
只是在@user1937198评论的帮助下回答了我自己的问题。
解决方案是用新任务直接更新对
FuturesUnordered
的引用,而不是关注handles
,这大大简化了工作。