是否可以在Rust中实现像Java的CompletableFuture::complete这样的特性?

muk1a3rh  于 2023-02-16  发布在  Java
关注(0)|答案(1)|浏览(96)

我是rust的初学者,我正在尝试使用rust的异步编程。
在我的需求场景中,我想创建一个空的Future,经过一个复杂的多轮调度过程后,在另一个线程中完成,Java的CompletableFuture::complete很好地满足了我的需求。
我尝试过寻找Rust的实现,但是还没有找到。
有可能在拉斯特做吗?
我从下面的评论中了解到,使用通道更符合rust的设计。
我的场景是一个层次化的调度执行器,例如,Task1将被拆分为多个驱动程序,每个驱动程序将使用多线程(rayon threadpool)来做一些计算工作,并且前一个驱动程序的状态改变会触发下一个驱动程序的执行,整个任务的结果是最后一个驱动程序的输出,中间驱动程序没有输出。我的async函数不能直接从一个spawn任务中获取结果,所以我需要一个共享的堆栈变量或通道来传输结果。
所以我真正想要的是:最后一个在rayon线程中执行的驱动程序,它可以通过通道的标识获得通道的tx,而无需存储它(以简化状态更改过程)。
我发现oneshot的tx和rx不能被复制,它们不是线程安全的,并且tx的send方法需要所有权。所以,我不能将tx存储在主线程中,让最后一个驱动程序通过识别找到它是tx。但是我可以使用mpsc来做到这一点,我写了2个演示并将其粘贴到问题的主体中,但我必须创建容量为1的mpsc并手动关闭它。
我写了2个演示,如下所述。我想知道这是否是一个适当的和有效的使用mpsc?
使用oneshot实现的版本无法工作。

#[tokio::test]
pub async fn test_async() -> Result<()>{
    let mut executor = Executor::new();

    let res1 = executor.run(1).await?;
    let res2 = executor.run(2).await?;
    println!("res1 {}, res2 {}", res1, res2);
    Ok(())
}

struct Executor {
    pub pool: ThreadPool,
    pub txs: Arc<DashMap<i32, RwLock<oneshot::Sender<i32>>>>,
}

impl Executor {
    pub fn new() -> Self {
        Executor{
            pool: ThreadPoolBuilder::new().num_threads(10).build().unwrap(),
            txs: Arc::new(DashMap::new()),
        }
    }

    pub async fn run(&mut self, index: i32) -> Result<i32> {
        let (tx, rx) = oneshot::channel();
        self.txs.insert(index, RwLock::new(tx));
        let txs_clone = self.txs.clone();
        self.pool.spawn(move || {
            let spawn_tx = txs_clone.get(&index).unwrap();
            let guard = block_on(spawn_tx.read());
            // cannot work, send need ownership, it will cause move of self
            guard.send(index);
        });

        let res = rx.await;
        return Ok(res.unwrap());
    }
}

使用mpsc实现的版本,可以工作,不确定性能

#[tokio::test]
pub async fn test_async() -> Result<()>{
    let mut executor = Executor::new();

    let res1 = executor.run(1).await?;
    let res2 = executor.run(2).await?;
    println!("res1 {}, res2 {}", res1, res2);
    // close channel after task finished
    executor.close(1);
    executor.close(2);
    Ok(())
}

struct Executor {
    pub pool: ThreadPool,
    pub txs: Arc<DashMap<i32, RwLock<mpsc::Sender<i32>>>>,
}

impl Executor {
    pub fn new() -> Self {
        Executor{
            pool: ThreadPoolBuilder::new().num_threads(10).build().unwrap(),
            txs: Arc::new(DashMap::new()),
        }
    }

    pub fn close(&mut self, index:i32) {
        self.txs.remove(&index);
    }

    pub async fn run(&mut self, index: i32) -> Result<i32> {
        let (tx, mut rx) = mpsc::channel(1);
        self.txs.insert(index, RwLock::new(tx));
        let txs_clone = self.txs.clone();
        self.pool.spawn(move || {
            let spawn_tx = txs_clone.get(&index).unwrap();
            let guard = block_on(spawn_tx.value().read());
            block_on(guard.deref().send(index));
        });
        // 0 mock invalid value
        let mut res:i32 = 0;
        while let Some(data) = rx.recv().await {
            println!("recv data {}", data);
            res = data;
            break;
        }
        return Ok(res);
    }
}
e5nqia27

e5nqia271#

  • 免责声明:很难描绘出您想要实现的目标,因为所提供的示例很容易解决,没有理由增加复杂性(DashMap)。因此,这个答案将是渐进的,尽管它将继续专注于解决您所证明的问题,而不一定是您正在思考的问题...因为我没有水晶球。*

我们将在示例中使用以下Result类型:

type Result<T> = Result<T, Box<dyn Error + Send + Sync + 'static>>;

串行执行

执行任务最简单的方法就是此时此地执行任务。

impl Executor {
    pub async fn run<F>(&self, task: F) -> Result<i32>
    where
        F: FnOnce() -> Future<Output = Result<i32>>,
    {
        task().await
    }
}

异步执行-内置

当任务的执行可能涉及大量计算时,在后台线程上执行它可能是有益的。
无论您使用的是哪种运行时都可能支持此功能,我将使用tokio进行演示:

impl Executor {
    pub async fn run<F>(&self, task: F) -> Result<i32>
    where
        F: FnOnce() -> Result<i32>,
    {
        Ok(tokio::task::spawn_block(task).await??)
    }
}

异步执行-一次性

如果您希望更多地控制CPU绑定线程的数量,或者限制它们,或者根据不同的需要对计算机的CPU进行分区,那么异步运行时可能无法进行足够的配置,您可能更愿意使用线程池。
在这种情况下,可以通过通道实现与运行时的同步,最简单的通道是oneshot通道。

impl Executor {
    pub async fn run<F>(&self, task: F) -> Result<i32>
    where
        F: FnOnce() -> Result<i32>,
    {
        let (tx, mut rx) = oneshot::channel();

        self.pool.spawn(move || {
            let result = task();

            //  Decide on how to handle the fact that nobody will read the result.
            let _ = tx.send(result);
        });

        Ok(rx.await??)
    }
}

注意,在上面所有的解决方案中,task对于它是如何执行的是不可知的,这是一个你应该努力争取的属性,因为它通过更巧妙地分离这两个概念,使得将来更容易改变处理执行的方式。

相关问题