当多个派生在一个循环中执行代码时,避免在rust中出现死锁

pdkcd3nj  于 2023-01-30  发布在  其他
关注(0)|答案(1)|浏览(83)

我试图并行运行两个线程,并在它们之间共享一些数据。当其中一个线程包含循环语句时,另一个线程中的共享数据将进入死锁状态。
但是,如果我在代码中添加一行代码,以便在经过一定次数的迭代后跳出loop语句,那么死锁就会被释放,下一个线程中的操作就会开始。
Rust Playground
代码:

use std::sync::{Arc, Mutex};
    use std::thread;
    use std::time::Duration;
    
    #[derive(Clone, Copy)]
    struct SomeNetwork {
        is_connected: bool,
    }
    
    impl SomeNetwork {
        fn connection_manager(&mut self) {
            loop {
                // if I exit the loop after a few iterations then the deadlock is removed
                // eg: when I use `for i in 0..10 {` instead of `loop`
    
                println!("connection_manager thread...");
    
                thread::sleep(Duration::from_millis(2000));
            }
        }
    
        fn api_calls(&self) {
            loop {
                if self.is_connected {
                    //make_an_api_call()
                }
    
                println!("api_calls thread...");
    
                thread::sleep(Duration::from_millis(5000));
            }
        }
    
        pub fn start() {
            let self_arc = SomeNetwork {
                is_connected: false,
            };
    
            let self_arc = Arc::new(Mutex::new(self_arc));
            let self_cloned1 = Arc::clone(&self_arc);
            let self_cloned2 = Arc::clone(&self_arc);
    
            thread::Builder::new()
                .spawn(move || {
                    let mut n = self_cloned1.lock().unwrap();
    
                    n.connection_manager();
                })
                .unwrap();
    
            thread::Builder::new()
                .spawn(move || {
                    let n = self_cloned2.lock().unwrap(); // <---- deadlock here
    
                    n.api_calls();
                })
                .unwrap();
    
            loop {
                thread::sleep(Duration::from_millis(5000))
            }
        }
    }
    
    fn main() {
        SomeNetwork::start();
    }

输出:

connection_manager thread...
connection_manager thread...
connection_manager thread...
connection_manager thread...
connection_manager thread...
....

一旦线程进入睡眠状态,底层操作系统不会负责调度吗?
这里可以做些什么,以便我可以并行运行两个线程?

i2loujxw

i2loujxw1#

问题是您创建的互斥锁在connection_manager期间保持锁定。
在Rust中使用互斥锁的方式是它 Package 它锁定的数据,当你锁定互斥锁时,它会阻塞当前线程,直到它能够获得互斥锁为止,一旦它获得了,它给你一个MutexGuard,你可以把它看作是一个互斥体引用的 Package 器,MutexGuard给你一个对互斥体内部数据的可变访问。然后,一旦不再需要MutexGuard,Rust调用MutexGuardDrop实现,它解锁互斥锁并允许其他线程获得它。

// Block until mutex is locked for this thread and return MutexGuard
let mut n = self_cloned1.lock().unwrap();

// Do stuff with the locked mutex
n.connection_manager();

// MutexGuard is no longer needed so it gets dropped and the mutex is released

如您所见,如果connection_manager从未退出,则互斥锁将保持锁定状态,直到第一个线程获得互斥锁。
您可能需要将互斥锁与condvar一起使用,以便在线程休眠时释放互斥锁。

    • 编辑:**

下面是使用condvar处理连接和使用通道将作业传递给工作线程的大致概念。Playground Link

use std::sync::{Arc, Mutex, Condvar};
use std::thread::{self, current};
use std::time::Duration;

use crossbeam_channel::{unbounded, Receiver};

#[derive(Clone, Copy)]
struct SomeNetwork {
    is_connected: bool,
}

const TIMEOUT: Duration = Duration::from_secs(5);

impl SomeNetwork {
    fn connect(&mut self) {
        println!("connection_manager thread...");
        self.is_connected = true;
    }

    fn api_calls(&self, job: i32) {
        //println!("api_calls thread...");
        println!("[Worker {:?}] Handling job {}", current().id(), job);
        thread::sleep(Duration::from_millis(50))
    }

    pub fn start_connection_thread(
        self_data: Arc<Mutex<Self>>,
        connect_condvar: Arc<Condvar>,
        worker_condvar: Arc<Condvar>,
    ) {
        thread::Builder::new()
            .spawn(move || {
                let mut guard = self_data.lock().unwrap();

                loop {
                    // Do something with the data
                    if !guard.is_connected {
                        guard.connect();

                        // Notify all workers that the connection is ready
                        worker_condvar.notify_all();
                    }

                    // Use condvar to release mutex and wait until signaled to start again
                    let (new_guard, _) = connect_condvar.wait_timeout(guard, TIMEOUT).unwrap();
                    guard = new_guard;
                }
            })
            .unwrap();
    }
    
    
    pub fn start_worker_thread(
        self_data: Arc<Mutex<Self>>,
        connect_condvar: Arc<Condvar>,
        worker_condvar: Arc<Condvar>,
        requests: Receiver<i32>,
    ) {
        thread::Builder::new()
            .spawn(move || {
                loop {
                
                    // Wait until a request is received
                    let request = requests.recv().unwrap();
                
                    // Lock mutex once we have a request
                    let mut guard = self_data.lock().unwrap();
                
                    // Make sure we are connected before starting tasks
                    while !guard.is_connected {
                        // Wake up 1 connection thread if the connection breaks
                        connect_condvar.notify_one();
                        
                        // Sleep until signaled that the connection has been fixed
                        let (new_guard, _) = worker_condvar.wait_timeout(guard, TIMEOUT).unwrap();
                        guard = new_guard;
                    }

                    // Now that we have verified we are connected, handle the request
                    guard.api_calls(request);
                    
                }
            })
            .unwrap();
    }

    pub fn start() {
        let self_arc = SomeNetwork {
            is_connected: false,
        };

        let self_arc = Arc::new(Mutex::new(self_arc));
        let connect_condvar = Arc::new(Condvar::new());
        let worker_condvar = Arc::new(Condvar::new());

        // Create a channel to send jobs to workers
        let (send, recv) = unbounded();

        Self::start_connection_thread(self_arc.clone(), connect_condvar.clone(), worker_condvar.clone());
        
        // Start some workers
        for _ in 0..5 {
            Self::start_worker_thread(self_arc.clone(), connect_condvar.clone(), worker_condvar.clone(), recv.clone());
        }
        
        // Send messages to workers
        for message in 1..100 {
            send.send(message);
        }
        

        loop {
            thread::sleep(Duration::from_millis(5000))
        }
    }
}

fn main() {
    SomeNetwork::start();
}

相关问题