rust 如何应用子类型化和方差概念

wljmcqd8  于 2023-08-05  发布在  其他
关注(0)|答案(1)|浏览(97)

我是Rust的新手,我目前面临的问题与subtyping and variance概念有关(只是猜测,根据cargo在构建时显示的帮助消息)。

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, mpsc, Mutex};
use std::thread;

trait Draw {
    fn draw(&self);
}

#[derive(Default)]
struct Button {
    
}

impl Draw for Button {
    fn draw(&self) {
        println!("draw button");
    }
}

#[derive(Default)]
struct SelectionBox {
    
}

impl Draw for SelectionBox {
    fn draw(&self) {
        println!("draw selection box");
    }
}

#[derive(Default)]
struct TextField {
    
}

impl Draw for TextField {
    fn draw(&self) {
        println!("draw text field");
    }
}

pub struct RunningThreadInterface<T> {
    pub instance: Arc<T>,
    pub thread_join_handle: thread::JoinHandle<()>,
}

pub trait StartThread<T> {
    fn start(self, thread_id: String) -> RunningThreadInterface<T>;
    fn run(&self);
}

pub trait TerminateThread {
    fn stop(&mut self);
    fn wait(self);
}

struct Screen<'a> {
    widgets: Mutex<Vec<&'a (dyn Draw + Send + Sync)>>,
    rx: Mutex<mpsc::Receiver<String>>,
    terminate_flag: AtomicBool,
}

impl<'a> Screen<'a> {
    fn new(rx: mpsc::Receiver<String>) -> Screen<'a> {
        Screen {
            widgets: Mutex::new(Vec::new()),
            rx: Mutex::new(rx),
            terminate_flag: AtomicBool::new(false),
        }
    }
    
    fn add(&mut self, widget: &'a (dyn Draw + Send + Sync)) {
        self.widgets.lock().unwrap().push(widget);
    }
    
    fn draw_widgets(&self) {
        for widget in &*self.widgets.lock().unwrap() {
            widget.draw();
        }
    }
}

impl<'a> StartThread<Screen<'a>> for Screen<'a> {
    fn start(self, thread_id: String) -> RunningThreadInterface<Screen<'a>> {
        let screen = Arc::new(self);
        RunningThreadInterface {
            instance: Arc::clone(&screen),
            thread_join_handle: thread::Builder::new().name(thread_id).spawn(move || screen.run()).ok().unwrap(),
        }
    }

    fn run(&self) {
        while !self.terminate_flag.load(Ordering::SeqCst) {
            self.rx.lock().unwrap().recv().unwrap();
        }
    }
}

impl<'a> TerminateThread for RunningThreadInterface<Screen<'a>> {
    fn stop(&mut self) {
        self.instance.terminate_flag.store(true, Ordering::SeqCst);
    }

    fn wait(self) {
        self.thread_join_handle.join();
    }
}

fn main() {
    let button: Button = Default::default();
    let selection_box: SelectionBox = Default::default();
    let text_field: TextField = Default::default();
    
    let (_tx, rx) = mpsc::channel();
    
    let mut screen = Screen::new(rx);
    screen.add(&button);
    screen.add(&selection_box);
    screen.add(&text_field);
    
    screen.draw_widgets();
    
    println!("");
    
    button.draw();
    selection_box.draw();
    text_field.draw();
}

字符串
错误类型

error[E0521]: borrowed data escapes outside of method
  --> src/main.rs:90:33
   |
85 | impl<'a> StartThread<Screen<'a>> for Screen<'a> {
   |      -- lifetime `'a` defined here
86 |     fn start(self, thread_id: String) -> RunningThreadInterface<Screen<'a>> {
   |              ---- `self` is a reference that is only valid in the method body
...
90 |             thread_join_handle: thread::Builder::new().name(thread_id).spawn(move || screen.run()).ok().unwrap(),
   |                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |                                 |
   |                                 `self` escapes the method body here
   |                                 argument requires that `'a` must outlive `'static`
   |
   = note: requirement occurs because of the type `Screen<'_>`, which makes the generic argument `'_` invariant
   = note: the struct `Screen<'a>` is invariant over the parameter `'a`
   = help: see <https://doc.rust-lang.org/nomicon/subtyping.html> for more information about variance

For more information about this error, try `rustc --explain E0521`.

hmtdttj4

hmtdttj41#

看起来这个问题是由传递一些变量给thread::spawn引起的,这要求接收到的变量具有'static生命周期(因为编译器无法证明线程不会永远存在,也无法证明传递的引用在线程生命周期内是否正常)。我知道3种方法来解决当我们在稳定的Rust中向thread::spawn传递具有非静态生命周期的变量时发生的问题

  • 使用作用域线程(这不适合这种情况,因为我们不想在start中阻塞)
fn do_something(some_bool: &AtomicBool) {
    // This code causes error
    // borrowed data escapes outside of function [E0521] `some_bool` escapes the function body here
    // thread::spawn(move || {
    //     some_bool.store(true, Ordering::Relaxed);
    // });

    // This code blocks current thread until all spawned in `scope` threads finish execution
    thread::scope(|scope| {
        scope.spawn(move || {
            some_bool.store(true, Ordering::Relaxed);
        });
    });
    // Current thread will be blocked and code will reach this place only after all
    // spawned in `scope` threads finish execution
}

字符串

  • 传递给thread::spawn的变量需要静态生存期(我们希望避免)
fn do_something_static_lifetime(some_bool: &'static AtomicBool) {
    thread::spawn(move || {
        some_bool.store(true, Ordering::Relaxed);
    });
}

  • 使用unsafe和强制线程,我们希望生成的线程永远不会超过某些类型的生命周期,通过实现逻辑,这将强制线程在Drop中完成执行(这很可能是作者想要的)
struct MinimalEventLoopScheduler<'event_loop_scheduler> {
    action_queue: Mutex<VecDeque<Box<dyn FnOnce() + Send + 'event_loop_scheduler>>>,
    is_drop_finished: AtomicBool,
    is_drop_started: AtomicBool,
}

impl<'event_loop_scheduler> Drop for MinimalEventLoopScheduler<'event_loop_scheduler> {
    fn drop(&mut self) {
        // We need Release to prevent code from places before `drop()`
        // from being reordered after this line
        self.is_drop_started.store(true, Ordering::Release);

        // We need `Ordering::Acquire` to prevent compiler from reordering loop below fence to
        // places above before `self.is_drop_started.store(true, Ordering::Relaxed);`,
        // and also to prevent `self.is_drop_started.store(true, Ordering::Relaxed);`
        // from being reordered to places below(especially after loop)
        fence(Ordering::Acquire);

        loop {
            // Waiting until thread processes all tasks and finishes execution
            if self.is_drop_finished.load(Ordering::Relaxed) {
                break;
            }
        }

        // We need `AcqRel` to prevent loop from being reordered
        // to places after freeing memory and to prevent freeing
        // memory from being reordered to places before loop 
        fence(Ordering::AcqRel);

        // Memory will be freed here
    }
}

unsafe fn extend_action_queue_lifetime<'event_loop_scheduler>(
    t: &Mutex<VecDeque<Box<dyn FnOnce() + Send + 'event_loop_scheduler>>>,
) -> &'static Mutex<VecDeque<Box<dyn FnOnce() + Send + 'static>>> {
    transmute::<
        &Mutex<VecDeque<Box<dyn FnOnce() + Send + 'event_loop_scheduler>>>,
        &Mutex<VecDeque<Box<dyn FnOnce() + Send + 'static>>>,
    >(t)
}

unsafe fn extend_reference_lifetime<'t, T>(t: &T) -> &'static T {
    transmute::<&T, &T>(t)
}

impl<'event_loop_scheduler> MinimalEventLoopScheduler<'event_loop_scheduler> {
    pub fn new() -> Self {
        let event_loop_scheduler = Self {
            action_queue: Mutex::new(VecDeque::new()),
            is_drop_finished: AtomicBool::new(false),
            is_drop_started: AtomicBool::new(false),
        };

        event_loop_scheduler.start_event_loop();

        event_loop_scheduler
    }

    pub fn schedule(&self, action: impl FnOnce() + Send + 'event_loop_scheduler) {
        let action_box = Box::new(action);
        self.action_queue.lock().unwrap().push_back(action_box);
    }

    fn start_event_loop(&self) {
        // Without unsafe code there will be error
        // It is ok to use this unsafe logic, because we know for sure that thread won't outlive
        // `MinimalEventLoopScheduler`, because `Drop` implementation waits until thread is dropped,
        // so reference will outlive thread
        let action_queue = unsafe { extend_action_queue_lifetime(&self.action_queue) };
        let is_drop_finished = unsafe { extend_reference_lifetime(&self.is_drop_finished) };
        let is_drop_started = unsafe { extend_reference_lifetime(&self.is_drop_started) };

        thread::spawn(move || {
            let mut action_option;
            'event_loop: loop {
                {
                    // We do it in separate scope to decrease time under lock
                    action_option = action_queue.lock().unwrap().pop_front();
                }

                match action_option {
                    None => {
                        // It is ok to use `Relaxed` here, because it can't be reordered without
                        // breaking single-threaded logic, and we don't read/store non-atomic memory in `if` branch
                        if is_drop_started.load(Ordering::Relaxed) {
                            // We don't need to use `Release`, because we don't have memory
                            // changes which happen before `store` and which we want to make
                            // accessible to other threads(and which weren't made accessible by `lock`),
                            // and compiler can not reorder `action()` from previous iteration, because `action_queue.lock()`
                            // prevents reordering before and after itself, and compiler can reorder
                            // only next 2 lines(store and break) without breaking single-threaded logic,
                            // and reordering them is fine, since they don't have non-atomic memory
                            // which can be accessed from multiple threads
                            is_drop_finished.store(true, Ordering::Relaxed);
                            break 'event_loop;
                        }
                    }
                    Some(action) => {
                        action();
                    }
                }
            }
        });
    }
}

相关问题