Rust:应用程序陷入死锁

zaqlnxep  于 2023-05-17  发布在  其他
关注(0)|答案(1)|浏览(151)

我是Rust的新手,目前正在学习并发章节。在我当前的实现中,我创建了两个trait,即StartThread(在start方法中创建新线程,该方法在内部调用run方法)和TerminateThread(用于终止线程;有两种方法,即stopwait)。当从主线程调用stop方法时,我的实现目前陷入死锁。
thread.rs

use std::sync::{Arc, Mutex};
use std::thread;

pub struct RunningThreadInterface<T> {
    pub instance: Arc<Mutex<T>>,
    pub thread_join_handle: thread::JoinHandle<()>,
}

pub trait StartThread<T> {
    fn start(self, thread_ID: String) -> RunningThreadInterface<T>;
    fn run(&mut self);
}

pub trait TerminateThread {
    fn stop(&mut self);
    fn wait(self);
}

NetworkReceiver.rs

use std::{thread, time};
use std::sync::{atomic::{AtomicBool, Ordering}, Arc, Mutex};

use crate::thread::*;

#[derive(Default)]
pub struct NetworkReceiverThread {
    thread_ID: String,
    terminate_flag: AtomicBool,
}

impl NetworkReceiverThread {
    pub fn new() -> NetworkReceiverThread {
        NetworkReceiverThread {
            thread_ID: String::from(""),
            terminate_flag: AtomicBool::new(false),
        }
    }
}

impl StartThread<NetworkReceiverThread> for NetworkReceiverThread {
    fn start(mut self, thread_ID: String) -> RunningThreadInterface<NetworkReceiverThread> {
        self.thread_ID = thread_ID.clone();

        let network_receiver = Arc::new(Mutex::new(self));
        RunningThreadInterface {
            instance: Arc::clone(&network_receiver),
            thread_join_handle: thread::Builder::new().name(thread_ID).spawn(move || network_receiver.lock().unwrap().run()).ok().unwrap(),
        }
    }

    fn run(&mut self) {
        // let mut buff: [u8; 2048] = [0; 2048];

        while !self.terminate_flag.load(Ordering::SeqCst) {
            // receive network data and put into queue(will be processed by processor thread)
            println!("receiver thread");
            std::thread::sleep(time::Duration::from_secs(1));
        }
    }
}

impl TerminateThread for RunningThreadInterface<NetworkReceiverThread> {
    fn stop(&mut self) {
        self.instance.lock().unwrap().terminate_flag.store(true, Ordering::SeqCst);
    }

    fn wait(self) {
        self.thread_join_handle.join();
    }
}

main.rs

mod thread;
mod NetworkReceiver;

use std::time;
use thread::*;
use NetworkReceiver::NetworkReceiverThread;

fn main() {
    let network_receiver = NetworkReceiverThread::new();

    let mut network_receiver: RunningThreadInterface<NetworkReceiverThread> = network_receiver.start(String::from("NT"));

    std::thread::sleep(time::Duration::from_secs(5));

    network_receiver.stop();
    network_receiver.wait();
}

cargo.toml

[package]
name = "ThreadChapter"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
j9per5c4

j9per5c41#

start()中的锁是临时的,它在语句结束时被删除。所以在run()运行的整个过程中,锁都是被持有的,当您试图获取它时,它仍然被持有。
解决方案取决于您的要求。在这个简单的例子中,根本不需要使用Mutex,因为AtomicBool允许通过共享引用进行赋值:
thread.rs:

use std::sync::Arc;
use std::thread;

pub struct RunningThreadInterface<T> {
    pub instance: Arc<T>,
    pub thread_join_handle: thread::JoinHandle<()>,
}

pub trait StartThread<T> {
    fn start(self, thread_ID: String) -> RunningThreadInterface<T>;
    fn run(&self);
}

pub trait TerminateThread {
    fn stop(&mut self);
    fn wait(self);
}

NetworkReceiver.rs:

use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::{thread, time};

use crate::thread::*;

#[derive(Default)]
pub struct NetworkReceiverThread {
    thread_ID: String,
    terminate_flag: AtomicBool,
}

impl NetworkReceiverThread {
    pub fn new() -> NetworkReceiverThread {
        NetworkReceiverThread {
            thread_ID: String::from(""),
            terminate_flag: AtomicBool::new(false),
        }
    }
}

impl StartThread<NetworkReceiverThread> for NetworkReceiverThread {
    fn start(mut self, thread_ID: String) -> RunningThreadInterface<NetworkReceiverThread> {
        self.thread_ID = thread_ID.clone();

        let network_receiver = Arc::new(self);
        RunningThreadInterface {
            instance: Arc::clone(&network_receiver),
            thread_join_handle: thread::Builder::new()
                .name(thread_ID)
                .spawn(move || network_receiver.run())
                .ok()
                .unwrap(),
        }
    }

    fn run(&self) {
        // let mut buff: [u8; 2048] = [0; 2048];

        while !self.terminate_flag.load(Ordering::SeqCst) {
            // receive network data and put into queue(will be processed by processor thread)
            println!("receiver thread");
            std::thread::sleep(time::Duration::from_secs(1));
        }
    }
}

impl TerminateThread for RunningThreadInterface<NetworkReceiverThread> {
    fn stop(&mut self) {
        self.instance.terminate_flag.store(true, Ordering::SeqCst);
    }

    fn wait(self) {
        self.thread_join_handle.join();
    }
}

相关问题