rust 在异步移动块中使用`&mut self`

xcitsw88  于 2023-05-07  发布在  其他
关注(0)|答案(2)|浏览(262)

我有一个WriterJob结构。它的功能并不重要;它所做的是产生一个长时间运行的任务,并为调用者提供一个API来检查作业的状态。

pub(crate) struct WriterJob {
    ...,
    status: Status,
}

impl WriterJob {
    async fn writer_job_impl(&mut self, rx: Receiver<WriteCommand>) {
       // Job implementation details. It's important that `self` is mutuable
       // since I update the job status
       // eg.
       // self.status = Status::Ready;
    }

    pub(crate) fn status(&self) -> Status {
       self.status
    }

    pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
        let (tx, rx) = mpsc::channel(10);

        let handle = tokio::spawn(async move {
            self.writer_job_impl(rx).await;
        });

        self.status = Status::Spawned;

        tx
    }

我得到这个错误:

error[E0521]: borrowed data escapes outside of associated function
  --> src/io/buffered_write.rs:92:22
   |
89 |       pub(crate) fn spawn_writer_job(&mut self) -> Sender<WriteCommand> {
   |                                      ---------
   |                                      |
   |                                      `self` is a reference that is only valid in the associated function body
   |                                      let's call the lifetime of this reference `'1`
...
92 |           let handle = tokio::spawn(async move {
   |  ______________________^
93 | |             self.writer_job_impl(rx).await;
94 | |         });
   | |          ^
   | |          |
   | |__________`self` escapes the associated function body here
   |            argument requires that `'1` must outlive `'static`

我想我理解编译器在抱怨它不知道self是否会和派生的任务一样长,因此出现了生存期错误。但我不确定这是一个好的方法。一种可能性是使用Arc<Mutex<Status>>Arc<RwLock<Status>>,但我不喜欢这种方法,因为我可能需要在self中添加更多的可互操作字段。有没有更干净的方法来做到这一点?

2hh7jdfx

2hh7jdfx1#

你不能假设self在执行handle的过程中没有被丢弃,这就是它抛出错误的原因。相反,您可以更改结构的设计,使其工作
1.创建一个子结构,其中包含需要在线程之间共享的数据

struct WriteJobInner {
    ...,
    status: Status,
}

1.创建对着色器数据执行操作的结构

// you can use 'std::sync::Mutex' if you need to touch the data in non-async
// functions
use tokio::sync::Mutex;
use std::sync::Arc;

#[derive(Clone)]
pub(crate) struct Writejob {
    inner: Arc<Mutex<WriteJobInner>>,
}

另外,也许你可以用tokio::sync::RwLock代替时雄的互斥,如果很多线程都试图对数据执行不可变操作,这会更好,这样可以让很多线程读取数据而不会阻塞其他只读线程。
1.最后一步是实现WriteJob函数,您可以使用&self而不是&mut self,因为您使用protected with mutex修改数据。

impl WriterJob {
    async fn writer_job_impl(&self, rx: Receiver<WriteCommand>) {
       // Job implementation details. It's important that `self` is mutuable
       // since I update the job status
       // eg.
       // *self.inner.lock().await.status = Status::Ready;
    }

    pub(crate) async fn status(&self) -> Status {
       // make sure the `status` implements copy trait
       *self.inner.lock().await.status
    }

    pub(crate) fn spawn_writer_job(&self) -> Sender<WriteCommand> {
        let (tx, rx) = mpsc::channel(10);
        // creates a new instance `Self` but all the instances of `self.inner`
        // references to the same shared state because it's wrapped by the
        // shader pointer `Arc`
        let writer = self.clone();

        let handle = tokio::spawn(async move {
            writer.writer_job_impl(rx).await;
        });

        *self.inner.status.lock().await = Status::Spawned;

        tx
    }
}
41ik7eoe

41ik7eoe2#

一个可能的解决方案是使用内部可变性。通过将Status字段更改为Cell<Status>(或原子类型,如coreband的crossbeam::atomic::AtomicCell),可以删除mut和生存期限制。
假设您将status字段移动到内部类型中;我称之为WriterJobInnerWriterJob现在只拥有一个Arc<WriterJobInner>

pub(crate) struct WriterJob {
    inner: std::sync::Arc<WriterJobInner>,
}

struct WriterJobInner {
    pub status: std::cell::Cell<Status>,
}

struct WriteCommand;

#[derive(Copy, Clone)]
enum Status {
    None,
    Spawned,
    Ready,
}

如果需要,可以为WriterJob实现Deref,以简化某些访问。
您的WriterJob实现现在将更改为:

impl WriterJob {
    pub(crate) fn spawn_writer_job(&self) -> tokio::sync::mpsc::Sender<WriteCommand> {
        let (tx, rx) = tokio::sync::mpsc::channel(10);

        // Clone the Arc and move it to the background thread.
        let inner = self.inner.clone();
        let handle = tokio::spawn(async move {
            inner.writer_job_impl(rx).await;
        });

        // TODO: Set only to Spawned if still None
        self.inner.status.set(Status::Spawned);
        tx
    }

    pub(crate) fn status(&self) -> Status {
        self.inner.status()
    }
}

由于线程不需要传递self-既不是可变的,也不是不变的-错误就会消失。此外,由于status现在是Cell,因此也不需要mut self
同样,您的WriterJobInner也只需要&self

impl WriterJobInner {
    pub async fn writer_job_impl(&self, rx: tokio::sync::mpsc::Receiver<WriteCommand>) {
        // ...
        self.status.set(Status::Ready);
    }

    pub fn status(&self) -> Status {
        self.status.get()
    }
}

unsafe impl Send for WriterJobInner {}
unsafe impl Sync for WriterJobInner {}

缺点是,WriterJobInner类型需要同时是SendSync,才能与Arc<T>一起使用,但无论如何,您都要跨线程使用它。
请注意,在创建线程后将status设置为Spawned是一个争用条件。你可能想尝试原子地设置值,只有当它还没有被设置为其他值时。
在这种方法中,最有用的组合可能是将需要一起更改的所有内容捆绑在一起,然后为其使用外部可变性。
使用上面的RwLock(或类似),例如如inner: Arc<RwLock<..>>,要求您混合使用同步和异步代码(async方法中的std::sync::RwLock)或使您的访问器为async(当使用tokio::sync::RwLock时)。

相关问题