Rust通过TCP将序列化的struct从TCP通道发送到多个接收者

u0njafvf  于 2023-11-19  发布在  其他
关注(0)|答案(2)|浏览(126)

我试图通过tcp向多台计算机发送序列化的结构。tcp处理程序接收序列化的结构(字符串类型)。我的问题是.try_iter()将耗尽.try_iter()通道,如果连接了多个客户端,客户端将无法接收相同的结构。我尝试将.try_iter()从单个handle_client()函数中移出,但是没有达到好的效果。谢谢你的时间和帮助!
这就是我目前所拥有的:
(服务器端)

use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::{Read,Write,Error};
use serde::{Serialize, Deserialize};
use crossbeam_channel::unbounded;

#[derive(Serialize, Deserialize)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

impl Serialized {
    pub fn serialize(buffer: Vec<u32>, timestep: u128) -> String {
        let x = Serialized {
           buffer,
           timestep 
        };
        serde_json::to_string(&x).unwrap()
    }
}

fn handle_client(mut stream: TcpStream, rx: crossbeam_channel::Receiver<String>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from channel
        let serialized = match rx.try_iter().last(){
            Some(x) => x,
            None => continue,
        };

        //write to stream
        stream.write(serialized.as_bytes())?;
    }
}

pub fn start_server(rx: crossbeam_channel::Receiver<String>) {
    
    let listener = TcpListener::bind("localhost:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        let rx = rx.clone();
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

字符串
(客户端)

use std::net::TcpStream;
use serde::{Serialize, Deserialize};
use std::error::Error as er;

#[derive(Serialize, Deserialize, Debug)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

fn read_user_from_stream(tcp_stream: &mut TcpStream) -> Result<Serialized, Box<dyn er>> {
    let mut de = serde_json::Deserializer::from_reader(tcp_stream);
    let u = Serialized::deserialize(&mut de)?;

    Ok(u)
}

pub fn start_client() {
    loop {
        let mut stream = TcpStream::connect("localhost:8888").expect("could not connect");
        let serialized = read_user_from_stream(&mut stream).unwrap();
        println!("timestep: {}", serialized.timestep);
    }
}

fn main() {
    start_client();
}

qv7cva1a

qv7cva1a1#

crossbeam does not offer broadcast functionality.crossbeam仅提供生产者-消费者架构;如果您希望将一个项目交付给多个接收者,则需要使用不同的机制。
看来bus机箱提供了你所需要的。

a1o7rhls

a1o7rhls2#

在rust用户板上进行了一些讨论后,我得出了这个解决方案(服务器端):

use std::sync::{Arc, Mutex};
use std::thread;
use std::net::{TcpListener, TcpStream};
use std::io::{Read,Write,Error};
use bus::{Bus, BusReader};

fn main() {
    let mut x: u32 = 0;
    let bus = Bus::<u32>::new(10);

    let bus_mutex = Arc::new(Mutex::new(bus));

    let bus_mutex_cp = Arc::clone(&bus_mutex);
    thread::spawn(move || {
        start_server(bus_mutex_cp);
    });

    //simulation loop
    for _ in 0..99999 {
        x = x + 1;
        println!("Simulation step: {}", x);
        bus_mutex.lock().unwrap().broadcast(x);
        thread::sleep_ms(1000);
    }

    loop {}

}

pub fn start_server(bus_mutex: Arc<Mutex<Bus<u32>>>) {
    
    let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                let rx = bus_mutex.lock().unwrap().add_rx();
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

fn handle_client(mut stream: TcpStream, mut rx: BusReader<u32>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from bus
        let x = rx.recv().unwrap();

        //write to stream
        stream.write(&x.to_string().as_bytes())?;

        thread::sleep_ms(100);
    }
}

字符串

相关问题