我是rust的初学者,我正在尝试使用rust的异步编程。
在我的需求场景中,我想创建一个空的Future,经过一个复杂的多轮调度过程后,在另一个线程中完成,Java的CompletableFuture::complete很好地满足了我的需求。
我尝试过寻找Rust的实现,但是还没有找到。
有可能在拉斯特做吗?
我从下面的评论中了解到,使用通道更符合rust的设计。
我的场景是一个层次化的调度执行器,例如,Task1将被拆分为多个驱动程序,每个驱动程序将使用多线程(rayon threadpool)来做一些计算工作,并且前一个驱动程序的状态改变会触发下一个驱动程序的执行,整个任务的结果是最后一个驱动程序的输出,中间驱动程序没有输出。我的async函数不能直接从一个spawn任务中获取结果,所以我需要一个共享的堆栈变量或通道来传输结果。
所以我真正想要的是:最后一个在rayon线程中执行的驱动程序,它可以通过通道的标识获得通道的tx,而无需存储它(以简化状态更改过程)。
我发现oneshot的tx和rx不能被复制,它们不是线程安全的,并且tx的send方法需要所有权。所以,我不能将tx存储在主线程中,让最后一个驱动程序通过识别找到它是tx。但是我可以使用mpsc来做到这一点,我写了2个演示并将其粘贴到问题的主体中,但我必须创建容量为1的mpsc并手动关闭它。
我写了2个演示,如下所述。我想知道这是否是一个适当的和有效的使用mpsc?
使用oneshot实现的版本无法工作。
#[tokio::test]
pub async fn test_async() -> Result<()>{
let mut executor = Executor::new();
let res1 = executor.run(1).await?;
let res2 = executor.run(2).await?;
println!("res1 {}, res2 {}", res1, res2);
Ok(())
}
struct Executor {
pub pool: ThreadPool,
pub txs: Arc<DashMap<i32, RwLock<oneshot::Sender<i32>>>>,
}
impl Executor {
pub fn new() -> Self {
Executor{
pool: ThreadPoolBuilder::new().num_threads(10).build().unwrap(),
txs: Arc::new(DashMap::new()),
}
}
pub async fn run(&mut self, index: i32) -> Result<i32> {
let (tx, rx) = oneshot::channel();
self.txs.insert(index, RwLock::new(tx));
let txs_clone = self.txs.clone();
self.pool.spawn(move || {
let spawn_tx = txs_clone.get(&index).unwrap();
let guard = block_on(spawn_tx.read());
// cannot work, send need ownership, it will cause move of self
guard.send(index);
});
let res = rx.await;
return Ok(res.unwrap());
}
}
使用mpsc实现的版本,可以工作,不确定性能
#[tokio::test]
pub async fn test_async() -> Result<()>{
let mut executor = Executor::new();
let res1 = executor.run(1).await?;
let res2 = executor.run(2).await?;
println!("res1 {}, res2 {}", res1, res2);
// close channel after task finished
executor.close(1);
executor.close(2);
Ok(())
}
struct Executor {
pub pool: ThreadPool,
pub txs: Arc<DashMap<i32, RwLock<mpsc::Sender<i32>>>>,
}
impl Executor {
pub fn new() -> Self {
Executor{
pool: ThreadPoolBuilder::new().num_threads(10).build().unwrap(),
txs: Arc::new(DashMap::new()),
}
}
pub fn close(&mut self, index:i32) {
self.txs.remove(&index);
}
pub async fn run(&mut self, index: i32) -> Result<i32> {
let (tx, mut rx) = mpsc::channel(1);
self.txs.insert(index, RwLock::new(tx));
let txs_clone = self.txs.clone();
self.pool.spawn(move || {
let spawn_tx = txs_clone.get(&index).unwrap();
let guard = block_on(spawn_tx.value().read());
block_on(guard.deref().send(index));
});
// 0 mock invalid value
let mut res:i32 = 0;
while let Some(data) = rx.recv().await {
println!("recv data {}", data);
res = data;
break;
}
return Ok(res);
}
}
1条答案
按热度按时间e5nqia271#
DashMap
)。因此,这个答案将是渐进的,尽管它将继续专注于解决您所证明的问题,而不一定是您正在思考的问题...因为我没有水晶球。*我们将在示例中使用以下
Result
类型:串行执行
执行任务最简单的方法就是此时此地执行任务。
异步执行-内置
当任务的执行可能涉及大量计算时,在后台线程上执行它可能是有益的。
无论您使用的是哪种运行时都可能支持此功能,我将使用tokio进行演示:
异步执行-一次性
如果您希望更多地控制CPU绑定线程的数量,或者限制它们,或者根据不同的需要对计算机的CPU进行分区,那么异步运行时可能无法进行足够的配置,您可能更愿意使用线程池。
在这种情况下,可以通过通道实现与运行时的同步,最简单的通道是
oneshot
通道。注意,在上面所有的解决方案中,
task
对于它是如何执行的是不可知的,这是一个你应该努力争取的属性,因为它通过更巧妙地分离这两个概念,使得将来更容易改变处理执行的方式。