我在我的真实的项目中使用下面的代码。(显然这是简化的,一个文件只用于操场。)
铁 rust 探险家Playground
正如您所看到的,我尝试使用sqlxTransaction从一个函数到一个闭包。
但我被困住了。
我甚至不知道这(如此常见的戈郎模式)是否是最好的wat在 rust 做。但至少它现在应该工作。
use std::{future::Future, pin::Pin, sync::Arc};
pub trait Trait: Send + Sync + Player + Shirt {}
impl<T: Player + Shirt> Trait for T {}
pub type Lambda<'a, ArgT, ResT> =
dyn Fn(ArgT) -> Pin<Box<dyn Future<Output = Result<ResT, String>> + Send + 'a>> + Sync + 'a;
#[async_trait::async_trait]
pub trait Player: Send + Sync {
async fn player_create<'a>(
&'a self,
_input: &PlayerInput,
lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
) -> Result<DomainPlayer, String>;
}
#[async_trait::async_trait]
pub trait Shirt: Send + Sync {
async fn shirt_get_next_and_increase<'a>(
&'a self,
tx: &'a mut sqlx::PgConnection,
model: String,
) -> Result<i64, String>;
}
pub struct Repo {
pub pool: Arc<sqlx::PgPool>,
}
impl Repo {
pub fn new(pool: Arc<sqlx::PgPool>) -> Self {
Self { pool }
}
}
#[async_trait::async_trait]
impl Player for Repo {
async fn player_create<'a>(
&'a self,
_input: &PlayerInput,
lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
) -> Result<DomainPlayer, String> {
let mut tx = self.pool.begin().await.unwrap();
// use _input here
let shirt_next_value = Box::new(|model: String| {
self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
});
let domain_player = lambda(PlayerCreateLambdaArgs { shirt_next_value }).await?;
let res =
sqlx::query_as::<_, DomainPlayer>("INSERT INTO player (...) VALUES (...) RETURNING *")
.bind(domain_player.id)
.bind(domain_player.shirt_number)
.fetch_one(&mut *tx)
.await
.unwrap();
Ok(res)
}
}
#[async_trait::async_trait]
impl Shirt for Repo {
async fn shirt_get_next_and_increase<'a>(
&'a self,
_tx: &'a mut sqlx::PgConnection,
_model: String,
) -> Result<i64, String> {
// Here I'm awaiting an async call for DB operations using the same DB transacion of the caller (_tx)...
// use _tx here...
let res = 123;
Ok(res)
}
}
pub struct Needs {
pub command_pg_repo: Arc<dyn Trait>,
}
#[derive(Default)]
pub struct PlayerInput {
pub id: String,
}
#[derive(Debug, Default, Clone, sqlx::FromRow)]
pub struct DomainPlayer {
pub id: String,
pub shirt_number: i64,
}
pub struct PlayerCreateLambdaArgs<'a> {
// other needed fields here
pub shirt_next_value: Box<
dyn FnMut(String) -> Pin<Box<dyn Future<Output = Result<i64, String>> + Send + 'a>>
+ Send
+ Sync
+ 'a,
>,
}
pub struct Handler {
needs: Arc<Needs>,
}
impl Handler {
pub fn new(needs: Arc<Needs>) -> Self {
Self { needs }
}
pub async fn handle(&self, input: &PlayerInput) -> Result<DomainPlayer, String> {
let res = self
.needs
.command_pg_repo
.player_create(&input, &|mut args| {
let input = input;
Box::pin(async move {
let shirt_number = (args.shirt_next_value)("player".to_string()).await?;
let o = DomainPlayer {
id: input.id.to_string(),
shirt_number,
};
Ok(o)
})
})
.await?;
Ok(res)
}
}
#[tokio::main]
async fn main() -> Result<(), String> {
let db_conn = sqlx::PgPool::connect("fake_url").await.unwrap();
let pg_repo = Arc::new(Repo::new(Arc::new(db_conn)));
let needs = Arc::new(Needs {
command_pg_repo: pg_repo,
});
let handler = Handler::new(needs);
let new_player_input = PlayerInput {
id: "abc".to_string(),
};
let player = handler.handle(&new_player_input).await?;
dbg!(player);
Ok(())
}
错误:
error: captured variable cannot escape `FnMut` closure body
--> src/main.rs:62:13
|
57 | let mut tx = self.pool.begin().await.unwrap();
| ------ variable defined here
...
61 | let shirt_next_value = Box::new(|model: String| {
| - inferred to be a `FnMut` closure
62 | self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^--^^^^^^^^
| | |
| | variable captured here
| returns a reference to a captured variable which escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
error[E0597]: `tx` does not live long enough
--> src/main.rs:62:65
|
55 | lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
| ------ lifetime `'1` appears in the type of `lambda`
...
61 | let shirt_next_value = Box::new(|model: String| {
| --------------- value captured here
62 | self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
| ^^ borrowed value does not live long enough
...
65 | let domain_player = lambda(PlayerCreateLambdaArgs { shirt_next_value }).await?;
| ---------------- this usage requires that `tx` is borrowed for `'1`
...
76 | }
| - `tx` dropped here while still borrowed
error[E0499]: cannot borrow `tx` as mutable more than once at a time
--> src/main.rs:71:34
|
55 | lambda: &Lambda<'_, PlayerCreateLambdaArgs<'a>, DomainPlayer>,
| ------ lifetime `'1` appears in the type of `lambda`
...
61 | let shirt_next_value = Box::new(|model: String| {
| --------------- first mutable borrow occurs here
62 | self::Shirt::shirt_get_next_and_increase(self, &mut tx, model)
| -- first borrow occurs due to use of `tx` in closure
...
65 | let domain_player = lambda(PlayerCreateLambdaArgs { shirt_next_value }).await?;
| ---------------- this usage requires that `tx` is borrowed for `'1`
...
71 | .fetch_one(&mut *tx)
| ^^ second mutable borrow occurs here
Some errors have detailed explanations: E0499, E0597.
For more information about an error, try `rustc --explain E0499`.
1条答案
按热度按时间uxhixvfz1#
代码有3个主要问题(更像是2.5个):
1.闭包不能返回对闭包拥有的东西的引用,或者闭包可变地借用的东西(更正式的说法是:
Fn*
函数/闭包traits家族的返回类型不能从闭包类型本身借用)。解决这个问题的一个方法是将tx
移动到闭包中,然后移动到闭包返回的Future
中。共享所有权可以通过std::sync::Arc
实现,当需要给予另一个任务所有权时,可以克隆它并移动它。Fn
闭包可以被并发调用,因此它们不能使用闭包拥有或借用的东西的可变引用,但是这段代码尝试使用从Fn
闭包内部的外部作用域借用的&mut tx
。(这个问题可以通过下面概述的解决方案绕过)。1.如果你想在共享的时候访问一些东西,你需要同步这些访问,这样一次只有一个东西可以访问它,在async Rust中实现这一点的一种方法是使用
tokio::sync::Mutex
。结合以上内容,将
tx
放入Arc<Mutex<...>>
,将Arc
的.clone()
移动到需要所有权的任务中,并在需要可变访问它时使用.lock().await
,这可能会解决您的问题(至少使它能够编译)。铁 rust 探测器
请注意,由于
&mut PgConenction
被传递到异步块内的shirt_get_next_and_increase
,该异步块引用tx.lock().await
返回的MutexGuard<Transaction<...>>
,因此MutexGuard将一直保留到shirt_get_next_and_increase
完成,即使它已退出这在这段代码中应该不是问题,因为它看起来基本上是顺序的,并且在shirt_get_next_and_increase
完成之前不访问tx
。如果这不是您想要的,(即,如果在shirt_get_next_and_increase
正在运行时,您的实际代码 * 确实 * 并发地访问tx
)您可以改为让shirt_get_next_and_increase
获取&Mutex<Transaction<...>>
,并且仅当它需要访问该连接时才使它保持该锁。另一种解决方案的草图是重新构造代码,以便
Transaction
通过函数参数和返回值中的值传递,例如:这可以解决借用和共享可变性问题,但可能会使API更加麻烦,并且可能由于其他原因而不可行。