rust 相互调用的相互引用结构的最佳实践

7fyelxc5  于 2023-08-05  发布在  其他
关注(0)|答案(4)|浏览(115)

几个小时后,我仍然没有找到一个(好的)方法来解决这个问题,即结构A使用A配置的助手结构B,以便B对A有一个异步回调:

// Helper for struct DeviceModel. An async timer, such as a thread.
trait AsyncTimer {
    // Set a timeout. After that, code code behind callback should be 
    // executed. The closure owns Arc<DeviceModel> to operate on DeviceModel
    // and to ensure its lifetime.
    fn configure_timeout(&self, duration: Duration, callback: Box<dyn Fn()>);
}

struct AsyncTimerImpl {
    // impl details for thread handling 
}

impl Asynctimer for AsyncTimerImpl {
   fn configure_timeout(&self, duration: Duration, callback: Box<dyn Fn()>) {
     // set next timeout when the thread should call `callback`
   }
}

struct DeviceModel {
    // Device model owns this relation. AsyncTimer must also be 
    able to asyncorunously call something on device model.
    async_timer: Box<dyn AsyncTimer>
}

impl DeviceModel {
    fn setup_next_timeout(&self) {
      let this = /* ... */; // actually, I need an Arc here instead of &self...
      let cb = Box::new(move || this.handle_timeout() );
      async_timer.configure_timeout(Duration::from_secs(1), cb);
    }
    fn handle_timeout(&self) {}
}

字符串
问题如下:DeviceModel可以调用self.async_timer.configure_timeout(),这将影响AsyncTimerImpl结构体后面运行的线程的行为。当线程触发下一个事件时,它应该调用DeviceModel上的操作。DevideModel应该负责在其自身上定义任意计算。
但是,我遇到了循环依赖问题,因为DeviceModel需要更改AsyncTimer的状态,并且传递给AsyncTimer的回调必须调用DeviceModel内部的代码:
1.不能在两个方向上都使用Arc<>,因为它可能会泄漏内存
1.在DeviceModel中有Arc<AsyncTimer>,在AsyncTimerImpl中有Weak<DeviceModel>,解决了1),但仍然需要对两个结构进行非常难看的惰性初始化。
我希望DeviceModel能够完全控制configure_timeout以及回调操作,但这似乎不可能解决。
有什么想法吗,我的 rust 齿动物们?PS:AsyncTimer不应该比相应的DeviceModel活得更长。有一辈子的联系就好了。

x8diyxa7

x8diyxa71#

幸运的是,Rust有适合你的东西:Arc::new_cyclic。它一起使用ArcWeak,专门设计用于处理循环数据结构,就像这里一样。
您的代码可能看起来有点像这样:

use std::sync::{Arc, Weak};
use std::time::Duration;

// Helper for struct DeviceModel. An async timer, such as a thread.
trait AsyncTimer {
    // Set a timeout. After that, code code behind callback should be
    // executed. The closure owns Arc<DeviceModel> to operate on DeviceModel
    // and to ensure its lifetime.
    fn configure_timeout(&self, duration: Duration, callback: Box<dyn Fn()>);
}

struct AsyncTimerImpl {
    device_model: Weak<DeviceModel>,
    // impl details for thread handling
}

impl AsyncTimer for AsyncTimerImpl {
    fn configure_timeout(&self, duration: Duration, callback: Box<dyn Fn()>) {
        // set next timeout when the thread should call `callback`
    }
}

struct DeviceModel {
    // Device model owns this relation. AsyncTimer must also be able to asyncorunously call something on device model.
    async_timer: Arc<dyn AsyncTimer>,
}

impl DeviceModel {
    fn setup_next_timeout(self: Arc<Self>) {
        let this = Arc::clone(&self);
        let cb = Box::new(move || this.handle_timeout());
        self.async_timer
            .configure_timeout(Duration::from_secs(1), cb);
    }
    fn handle_timeout(&self) {}
}

fn main() {
    let dm = Arc::new_cyclic(|me| DeviceModel {
        async_timer: Arc::new(AsyncTimerImpl {
            device_model: me.clone(),
        }),
    });
}

字符串
唯一的限制是这需要Rust 1.60或更新版本,但除非你有很强的兼容性要求,否则这应该可以正常工作。

x33g5p2x

x33g5p2x2#

我发现了一个被诅咒的变种,它使用了不安全的Rust,但是我认为,它是安全的(miri报告说它不安全,我在那里做的事情确实非常受诅咒)。但是考虑到控制流,它应该可以工作。
TL;DR:trait AsyncTimer有一个函数fn set_timeout(&self, timestamp: Duration, cb: Box<dyn Fn() + Send>)。要在device_model.set_timeout()中创建回调,可以使用以下简单的方法

fn set_next_timeout(&self) {
        let cb = {
            // This hack is only valid, as the device model and it's thread have a shorter
            // lifetime than the device model. This is enforced by control flow and not by
            // Rust's type system and lifetime logic.
            //
            // Otherwise, it is far from trivial to move a callback to the device model.
            let self_cursed: &'static DeviceModel<T> = unsafe { &*core::ptr::addr_of!(*self) };
            move || {
                self_cursed.on_timeout();
            }
        };
        let cb = Box::new(cb);
        self.timer.set_timeout(Duration::new(1, 0), cb);
    }

字符串
完整的代码可以在Rust Playground或下面找到。它产生以下输出:

Timer has fired!
AsyncTimer called callback in DeviceModel
Timer has fired!
AsyncTimer called callback in DeviceModel
Stopped Thread
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread::sleep;
use std::time::Duration;

/// An async timer that is owned by a device.
trait AsyncTimer: 'static {
    fn new() -> Self
    where
        Self: Sized;
    fn abort_timeout(&self);
    fn set_timeout(&self, timestamp: Duration, cb: Box<dyn Fn() + Send>);
}

struct DeviceModel<T: AsyncTimer> {
    timeout_counter: AtomicU64,
    timer: T,
}

struct AsyncTimerImpl {
    thread_handle: Option<std::thread::JoinHandle<()>>,
    mutex_condvar: Arc<(Mutex<Option<(Duration, Box<dyn Fn() + Send>)>>, Condvar)>,
    exit_thread: Arc<AtomicBool>,
}

impl AsyncTimerImpl {
    fn new() -> Self {
        let mutex_condvar: Arc<(Mutex<Option<(Duration, Box<dyn Fn() + Send>)>>, Condvar)> =
            Arc::new((Mutex::new(None), Condvar::new()));
        let exit_thread = Arc::new(AtomicBool::new(false));

        let thread_handle = {
            let mutex_condvar = mutex_condvar.clone();
            let exit_thread = exit_thread.clone();

            std::thread::spawn(move || loop {
                if exit_thread.load(Ordering::SeqCst) {
                    break;
                }

                let mut guard = mutex_condvar.0.lock().unwrap();
                if let Some((duration, cb)) = guard.take() {
                    let _unused = mutex_condvar.1.wait_timeout(guard, duration).unwrap();
                    println!("Timer has fired!");
                    cb();
                } else {
                    let _unused = mutex_condvar.1.wait(guard).unwrap();
                }
            })
        };

        Self {
            thread_handle: Some(thread_handle),
            mutex_condvar,
            exit_thread,
        }
    }
}

impl AsyncTimer for AsyncTimerImpl {
    fn new() -> Self
    where
        Self: Sized,
    {
        AsyncTimerImpl::new()
    }

    fn abort_timeout(&self) {
        let _ = self.mutex_condvar.0.lock().unwrap().take();
        self.mutex_condvar.1.notify_one();
    }

    fn set_timeout(&self, duration: Duration, cb: Box<dyn Fn() + Send>) {
        let _ = self.mutex_condvar.0.lock().unwrap().replace((duration, cb));
        self.mutex_condvar.1.notify_one();
    }
}

impl<T: AsyncTimer> DeviceModel<T> {
    fn new() -> Self {
        Self {
            timeout_counter: AtomicU64::new(0),
            timer: T::new(),
        }
    }

    fn set_next_timeout(&self) {
        let cb = {
            // This hack is only valid, as the device model and it's thread have a shorter
            // lifetime than the device model. This is enforced by control flow and not by
            // Rust's type system and lifetime logic.
            //
            // Otherwise, it is far from trivial to move a callback to the device model.
            let self_cursed: &'static DeviceModel<T> = unsafe { &*core::ptr::addr_of!(*self) };
            move || {
                self_cursed.on_timeout();
            }
        };
        let cb = Box::new(cb);
        self.timer.set_timeout(Duration::new(1, 0), cb);
    }

    fn on_timeout(&self) {
        self.timeout_counter.fetch_add(1, Ordering::SeqCst);
        println!("AsyncTimer called callback in DeviceModel");
    }
}

impl Drop for AsyncTimerImpl {
    fn drop(&mut self) {
        self.exit_thread.store(true, Ordering::SeqCst);
        self.mutex_condvar.1.notify_one();
        self.thread_handle.take().unwrap().join().unwrap();
        println!("Stopped Thread");
    }
}

// Safety: DeviceModel uses interior mutability
unsafe impl<T: AsyncTimer> Send for DeviceModel<T> {}
unsafe impl<T: AsyncTimer> Sync for DeviceModel<T> {}

fn main() {
    let model = DeviceModel::<AsyncTimerImpl>::new();

    model.set_next_timeout();
    sleep(Duration::from_secs(1));
    model.set_next_timeout();
    sleep(Duration::from_secs(1));
    model.set_next_timeout();
}

的数据

flmtquvp

flmtquvp3#

我不确定我完全理解你的问题。如果我是对的,您正在寻找一种优雅的方式来设置许多异步任务(在单独的线程中),这些任务也可以发出另一个任务。所有这些都必须在DeviceModel字段中定义的数据上完成。
如果是这样的话,我建议您考虑通道(https://doc.rust-lang.org/rust-by-example/std_misc/channels.html)并以这种方式管理您的DeviceModel

struct DeviceModel {
    data: Arc<Mutex<Data>>,
    tx: Sender<Task>,
    rx: Receiver<Task>,
}

字符串
您应该将data传递到任务中,以便它可以在那里更改。如果一个任务需要发射另一个任务,它通过定义为txrx的通道传输新任务的参数,主线程产生新线程。所有关于data的逻辑都在Data中实现。由于Rust闭包支持Send trait,因此可以直接在通道中传递它们(在我的示例中作为Task结构的一部分)。

struct Task<F: FnOnce() + Send + 'static> {
    closure: F,
    duration: ...
    ...
}


我相信这个模式是非常清晰的,它可以让你避免错误和混乱的代码。

wvmv3b1j

wvmv3b1j4#

在@bk2204提到的Arc::new_cyclic的帮助下,我可以满意地解决我的问题。所需的复杂性在合理的范围内。
我做了一个改变:回调不是传递Box<Fn()>,而是在AsyncTimer::on_timeout方法上定义。这“感觉”就像一个更好的设计。
Rust playground上的代码:Playground link

use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::thread::{sleep, JoinHandle};
use std::time::Duration;

/// An async timer that is owned by a [`DeviceModel`].
trait AsyncTimer: 'static {
    fn new(device: Weak<DeviceModel>) -> Arc<Self>
    where
        Self: Sized;
    fn abort_timeout(&self);
    fn set_timeout(&self, timestamp: Duration);
    fn device(&self) -> Arc<DeviceModel>;
    fn on_timeout(&self) {
        self.device().on_timeout();
    }
}

/// A device that owns a [`AsyncTimer`].
struct DeviceModel {
    timeout_counter: AtomicU64,
    timer: Arc<dyn AsyncTimer>,
}

/// An implementation of [`AsyncTimer`] that creates a thread that fires the
/// configured timer events. It has a cyclic self-reference so that the thread
/// can access the timer itself as well to invoke the corresponding callback on
/// the device.
struct AsyncThreadTimer {
    device: Weak<DeviceModel>,
    // Option so that it can be joined on Drop.
    thread_handle: Option<JoinHandle<()>>,
    mutex_condvar: Arc<(Mutex<Option<Duration>>, Condvar)>,
    exit_thread: Arc<AtomicBool>,
}

impl AsyncThreadTimer {
    fn new(device: Weak<DeviceModel>) -> Arc<Self> {
        let mutex_condvar = Arc::new((Mutex::new(None), Condvar::new()));
        let exit_thread = Arc::new(AtomicBool::new(false));
        Arc::new_cyclic(|self_ref| Self {
            device,
            thread_handle: Some(Self::spawn_thread(
                mutex_condvar.clone(),
                exit_thread.clone(),
                self_ref.clone(),
            )),
            mutex_condvar,
            exit_thread,
        })
    }

    fn spawn_thread(
        mutex_condvar: Arc<(Mutex<Option<Duration>>, Condvar)>,
        exit_thread: Arc<AtomicBool>,
        self_ref: Weak<Self>,
    ) -> JoinHandle<()> {
        std::thread::spawn(move || loop {
            if exit_thread.load(Ordering::SeqCst) {
                break;
            }

            let mut guard = mutex_condvar.0.lock().unwrap();
            if let Some(duration) = guard.take() {
                let _unused = mutex_condvar.1.wait_timeout(guard, duration).unwrap();

                if exit_thread.load(Ordering::SeqCst) {
                    break;
                }

                println!("Timer has fired!");
                self_ref
                    .upgrade()
                    .expect("the object should live at least as long as its managed thread")
                    .on_timeout();
            } else {
                let _unused = mutex_condvar.1.wait(guard).unwrap();
            }
        })
    }
}

impl AsyncTimer for AsyncThreadTimer {
    fn new(device: Weak<DeviceModel>) -> Arc<Self> {
        AsyncThreadTimer::new(device)
    }

    fn abort_timeout(&self) {
        let _ = self.mutex_condvar.0.lock().unwrap().take();
        self.mutex_condvar.1.notify_one();
    }

    fn set_timeout(&self, duration: Duration) {
        let _ = self.mutex_condvar.0.lock().unwrap().replace(duration);
        self.mutex_condvar.1.notify_one();
    }

    fn device(&self) -> Arc<DeviceModel> {
        self.device.upgrade().unwrap()
    }
}

impl DeviceModel {
    fn new<T: AsyncTimer>() -> Arc<Self> {
        Arc::new_cyclic(|self_ref| {
            let timer = T::new(self_ref.clone());
            Self {
                timeout_counter: AtomicU64::new(0),
                timer,
            }
        })
    }

    fn set_next_timeout(&self) {
        self.timer.set_timeout(Duration::new(1, 0));
    }

    fn on_timeout(&self) {
        self.timeout_counter.fetch_add(1, Ordering::SeqCst);
        println!("AsyncTimer called callback in DeviceModel");
    }
}

impl Drop for AsyncThreadTimer {
    fn drop(&mut self) {
        self.exit_thread.store(true, Ordering::SeqCst);
        self.mutex_condvar.1.notify_one();
        self.thread_handle.take().unwrap().join().unwrap();
        println!("Stopped Thread");
    }
}

// Safety: DeviceModel uses interior mutability
unsafe impl Send for DeviceModel {}
unsafe impl Sync for DeviceModel {}

fn main() {
    let model = DeviceModel::new::<AsyncThreadTimer>();

    model.set_next_timeout();
    sleep(Duration::from_secs(1));
    model.set_next_timeout();
    // We should see just two fired timeouts because of this
    model.timer.abort_timeout();
    sleep(Duration::from_secs(1));
    model.set_next_timeout();
    sleep(Duration::from_secs(2));
}

字符串

相关问题