rust 为什么将这个&mut数据库事务传递给这个闭包如此困难?

0ejtzxu1  于 2023-01-21  发布在  其他
关注(0)|答案(1)|浏览(195)

我在我的真实的项目中使用下面的代码。(显然这是简化的,一个文件只用于操场。)
铁 rust 探险家Playground
正如您所看到的,我尝试使用sqlxTransaction从一个函数到一个闭包。
但我被困住了。
我甚至不知道这(如此常见的戈郎模式)是否是最好的wat在 rust 做。但至少它现在应该工作。

use std::{future::Future, pin::Pin, sync::Arc};

pub trait Trait: Send + Sync + Player + Shirt {}

impl<T: Player + Shirt> Trait for T {}

pub type Lambda<'a, ArgT, ResT> =
    dyn Fn(ArgT) -> Pin<Box<dyn Future<Output = Result<ResT, String>> + Send + 'a>> + Sync + 'a;

#[async_trait::async_trait]
pub trait Player: Send + Sync {
    async fn player_create<'a>(
        &'a self,
        _input: &PlayerInput,
        lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
    ) -> Result<DomainPlayer, String>;
}

#[async_trait::async_trait]
pub trait Shirt: Send + Sync {
    async fn shirt_get_next_and_increase<'a>(
        &'a self,
        tx: &'a mut sqlx::PgConnection,
        model: String,
    ) -> Result<i64, String>;
}

pub struct Repo {
    pub pool: Arc<sqlx::PgPool>,
}

impl Repo {
    pub fn new(pool: Arc<sqlx::PgPool>) -> Self {
        Self { pool }
    }
}

#[async_trait::async_trait]
impl Player for Repo {
    async fn player_create<'a>(
        &'a self,
        _input: &PlayerInput,
        lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
    ) -> Result<DomainPlayer, String> {
        let mut tx = self.pool.begin().await.unwrap();

        // use _input here

        let shirt_next_value = Box::new(|model: String| {
            self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
        });

        let domain_player = lambda(PlayerCreateLambdaArgs { shirt_next_value }).await?;

        let res =
            sqlx::query_as::<_, DomainPlayer>("INSERT INTO player (...) VALUES (...) RETURNING *")
                .bind(domain_player.id)
                .bind(domain_player.shirt_number)
                .fetch_one(&mut *tx)
                .await
                .unwrap();

        Ok(res)
    }
}

#[async_trait::async_trait]
impl Shirt for Repo {
    async fn shirt_get_next_and_increase<'a>(
        &'a self,
        _tx: &'a mut sqlx::PgConnection,
        _model: String,
    ) -> Result<i64, String> {
        // Here I'm awaiting an async call for DB operations using the same DB transacion of the caller (_tx)...

        // use _tx here...

        let res = 123;

        Ok(res)
    }
}

pub struct Needs {
    pub command_pg_repo: Arc<dyn Trait>,
}

#[derive(Default)]
pub struct PlayerInput {
    pub id: String,
}

#[derive(Debug, Default, Clone, sqlx::FromRow)]
pub struct DomainPlayer {
    pub id: String,
    pub shirt_number: i64,
}

pub struct PlayerCreateLambdaArgs<'a> {
    // other needed fields here
    pub shirt_next_value: Box<
        dyn FnMut(String) -> Pin<Box<dyn Future<Output = Result<i64, String>> + Send + 'a>>
            + Send
            + Sync
            + 'a,
    >,
}

pub struct Handler {
    needs: Arc<Needs>,
}

impl Handler {
    pub fn new(needs: Arc<Needs>) -> Self {
        Self { needs }
    }

    pub async fn handle(&self, input: &PlayerInput) -> Result<DomainPlayer, String> {
        let res = self
            .needs
            .command_pg_repo
            .player_create(&input, &|mut args| {
                let input = input;

                Box::pin(async move {
                    let shirt_number = (args.shirt_next_value)("player".to_string()).await?;

                    let o = DomainPlayer {
                        id: input.id.to_string(),
                        shirt_number,
                    };

                    Ok(o)
                })
            })
            .await?;

        Ok(res)
    }
}

#[tokio::main]
async fn main() -> Result<(), String> {
    let db_conn = sqlx::PgPool::connect("fake_url").await.unwrap();

    let pg_repo = Arc::new(Repo::new(Arc::new(db_conn)));

    let needs = Arc::new(Needs {
        command_pg_repo: pg_repo,
    });

    let handler = Handler::new(needs);

    let new_player_input = PlayerInput {
        id: "abc".to_string(),
    };

    let player = handler.handle(&new_player_input).await?;

    dbg!(player);

    Ok(())
}

错误:

error: captured variable cannot escape `FnMut` closure body
  --> src/main.rs:62:13
   |
57 |         let mut tx = self.pool.begin().await.unwrap();
   |             ------ variable defined here
...
61 |         let shirt_next_value = Box::new(|model: String| {
   |                                                       - inferred to be a `FnMut` closure
62 |             self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
   |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^--^^^^^^^^
   |             |                                                   |
   |             |                                                   variable captured here
   |             returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

error[E0597]: `tx` does not live long enough
  --> src/main.rs:62:65
   |
55 |         lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
   |         ------ lifetime `'1` appears in the type of `lambda`
...
61 |         let shirt_next_value = Box::new(|model: String| {
   |                                         --------------- value captured here
62 |             self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
   |                                                                 ^^ borrowed value does not live long enough
...
65 |         let domain_player = lambda(PlayerCreateLambdaArgs { shirt_next_value }).await?;
   |                                                             ---------------- this usage requires that `tx` is borrowed for `'1`
...
76 |     }
   |     - `tx` dropped here while still borrowed

error[E0499]: cannot borrow `tx` as mutable more than once at a time
  --> src/main.rs:71:34
   |
55 |         lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
   |         ------ lifetime `'1` appears in the type of `lambda`
...
61 |         let shirt_next_value = Box::new(|model: String| {
   |                                         --------------- first mutable borrow occurs here
62 |             self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
   |                                                                 -- first borrow occurs due to use of `tx` in closure
...
65 |         let domain_player = lambda(PlayerCreateLambdaArgs { shirt_next_value }).await?;
   |                                                             ---------------- this usage requires that `tx` is borrowed for `'1`
...
71 |                 .fetch_one(&mut *tx)
   |                                  ^^ second mutable borrow occurs here

Some errors have detailed explanations: E0499, E0597.
For more information about an error, try `rustc --explain E0499`.
uxhixvfz

uxhixvfz1#

代码有3个主要问题(更像是2.5个):
1.闭包不能返回对闭包拥有的东西的引用,或者闭包可变地借用的东西(更正式的说法是:Fn*函数/闭包traits家族的返回类型不能从闭包类型本身借用)。解决这个问题的一个方法是将tx移动到闭包中,然后移动到闭包返回的Future中。共享所有权可以通过std::sync::Arc实现,当需要给予另一个任务所有权时,可以克隆它并移动它。

  1. Fn闭包可以被并发调用,因此它们不能使用闭包拥有或借用的东西的可变引用,但是这段代码尝试使用从Fn闭包内部的外部作用域借用的&mut tx。(这个问题可以通过下面概述的解决方案绕过)。
    1.如果你想在共享的时候访问一些东西,你需要同步这些访问,这样一次只有一个东西可以访问它,在async Rust中实现这一点的一种方法是使用tokio::sync::Mutex
    结合以上内容,将tx放入Arc<Mutex<...>>,将Arc.clone()移动到需要所有权的任务中,并在需要可变访问它时使用.lock().await,这可能会解决您的问题(至少使它能够编译)。
    铁 rust 探测器
diff --git a/src/main.rs b/src/main.rs
index 89fc611..6f7d375 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,7 @@
 use std::{future::Future, pin::Pin, sync::Arc};
 
+use tokio::sync::Mutex;
+
 pub trait Trait: Send + Sync + Player + Shirt {}
 
 impl<T: Player + Shirt> Trait for T {}
@@ -42,12 +44,19 @@ impl Player for Repo {
         _input: &PlayerInput,
         lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
     ) -> Result<DomainPlayer, String> {
-        let mut tx = self.pool.begin().await.unwrap();
+        let tx = Arc::new(Mutex::new(self.pool.begin().await.unwrap()));
 
         // use _input here
 
-        let shirt_next_value = Box::new(|model: String| {
-            self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
+        let shirt_next_value = Box::new({
+            let tx = tx.clone();
+            move |model: String| -> Pin<Box<dyn Future<Output = Result<i64, std::string::String>> + Send>> {
+                let tx = tx.clone();
+                Box::pin(async move {
+                    self::Shirt::shirt_get_next_and_increase(self, &mut *tx.lock().await, model)
+                        .await
+                })
+            }
         });
 
         let domain_player = lambda(PlayerCreateLambdaArgs { shirt_next_value }).await?;
@@ -56,7 +65,7 @@ impl Player for Repo {
             sqlx::query_as::<_, DomainPlayer>("INSERT INTO player (...) VALUES (...) RETURNING *")
                 .bind(domain_player.id)
                 .bind(domain_player.shirt_number)
-                .fetch_one(&mut *tx)
+                .fetch_one(&mut *tx.lock().await)
                 .await
                 .unwrap();

请注意,由于&mut PgConenction被传递到异步块内的shirt_get_next_and_increase,该异步块引用tx.lock().await返回的MutexGuard<Transaction<...>>,因此MutexGuard将一直保留到shirt_get_next_and_increase完成,即使它已退出这在这段代码中应该不是问题,因为它看起来基本上是顺序的,并且在shirt_get_next_and_increase完成之前不访问tx。如果这不是您想要的,(即,如果在shirt_get_next_and_increase正在运行时,您的实际代码 * 确实 * 并发地访问tx)您可以改为让shirt_get_next_and_increase获取&Mutex<Transaction<...>>,并且仅当它需要访问该连接时才使它保持该锁。
另一种解决方案的草图是重新构造代码,以便Transaction通过函数参数和返回值中的值传递,例如:

pub type Lambda<'a, ArgT, ResT> =
    dyn Fn(ArgT) -> Pin<Box<dyn Future<Output = Result<(ResT, ArgT), String>> + Send + 'a>> + Sync + 'a;
// ...
async fn shirt_get_next_and_increase<'a>(
    &'a self,
    mut tx: sqlx::Transaction<'static, sqlx::PgConnection>,
    _model: String,
) -> Result<(i64, sqlx::Transaction<'static, sqlx::PgConnection>), String> {
    // ...
    Ok((value, tx))
}

这可以解决借用和共享可变性问题,但可能会使API更加麻烦,并且可能由于其他原因而不可行。

相关问题