在Rust中遇到了时雄和daemonize的问题,如何让它们一起工作?

ubby3x7f  于 2023-04-21  发布在  其他
关注(0)|答案(1)|浏览(162)

我写了一个简单的程序,使用时雄和fern来接受来自TCP的输入,并将其记录到stdio和一个文件中。现在,我想让程序在后台运行在我的远程服务器上,这样它就可以随时记录我是否通过终端连接。所以谷歌了一下,发现daemonize crate似乎很受欢迎。
下面是我的代码:

use std::fs::File;
use std::str;
use std::time::SystemTime;

use fern::colors::{Color, ColoredLevelConfig};

use log::{debug, info, trace, warn};

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

use daemonize::Daemonize;

fn setup_logging(verbosity: u64) -> Result<(), fern::InitError> {
    let mut base_config = fern::Dispatch::new();

    base_config = match verbosity {
        0 => {
            // Let's say we depend on something which whose "info" level messages are too
            // verbose to include in end-user output. If we don't need them,
            // let's not include them.
            base_config
                .level(log::LevelFilter::Info)
                .level_for("overly-verbose-target", log::LevelFilter::Warn)
        }
        1 => base_config
            .level(log::LevelFilter::Debug)
            .level_for("overly-verbose-target", log::LevelFilter::Info),
        2 => base_config.level(log::LevelFilter::Debug),
        _3_or_more => base_config.level(log::LevelFilter::Trace),
    };

    // Separate file config so we can include year, month and day in file logs
    let file_config = fern::Dispatch::new()
        .format(|out, message, record| {
            let colors = ColoredLevelConfig::new().debug(Color::Magenta);
            out.finish(format_args!(
                "[{} {} {}] {}",
                humantime::format_rfc3339_seconds(SystemTime::now()),
                colors.color(record.level()),
                //record.level(),
                record.target(),
                message
            ))
        })
        .chain(fern::log_file("program.log")?);

    let stdout_config = fern::Dispatch::new()
        .format(|out, message, record| {
            let colors = ColoredLevelConfig::new().debug(Color::Magenta);
            // special format for debug messages coming from our own crate.
            if record.level() > log::LevelFilter::Info && record.target() == "cmd_program" {
                out.finish(format_args!(
                    "DEBUG @ {}: {}",
                    humantime::format_rfc3339_seconds(SystemTime::now()),
                    message
                ))
            } else {
                out.finish(format_args!(
                    "[{} {} {}] {}",
                    humantime::format_rfc3339_seconds(SystemTime::now()),
                    colors.color(record.level()),
                    //record.level(),
                    record.target(),
                    message
                ))
            }
        })
        .chain(std::io::stdout());

    base_config
        .chain(file_config)
        .chain(stdout_config)
        .apply()?;

    Ok(())
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let stdout = File::create("/tmp/daemon.out").unwrap();
    let stderr = File::create("/tmp/daemon.err").unwrap();

    let daemonize = Daemonize::new()
        .pid_file("/tmp/test.pid") // Every method except `new` and `start`
        .chown_pid_file(true) // is optional, see `Daemonize` documentation
        .working_directory("/tmp") // for default behaviour.
        .user("nobody")
        .group("daemon") // Group name
        .group(2) // or group id.
        .umask(0o777) // Set umask, `0o027` by default.
        .stdout(stdout) // Redirect stdout to `/tmp/daemon.out`.
        .stderr(stderr) // Redirect stderr to `/tmp/daemon.err`.
        .privileged_action(|| "Executed before drop privileges");

    match daemonize.start() {
        Ok(_) => println!("Success, daemonized"),
        Err(e) => eprintln!("Error, {}", e),
    }

    setup_logging(3).expect("failed to initialize logging.");
    info!("Server starting...");
    let listener = TcpListener::bind("###IP:port###").await?;
    info!("TcpListener bound to ###IP:port###");
    info!("{:?}", listener.local_addr());
    loop {
        println!("Listening for connection.");
        let (mut socket, _) = listener.accept().await?;
        info!("Connection made from client: {:?}", socket.peer_addr());
        println!("Connection made from client: {:?}", socket.peer_addr());

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => {
                        info!("Client ({:?}) closed the connection", socket.peer_addr());
                        println!("Client ({:?}) closed the connection", socket.peer_addr());
                        return;
                    }
                    Ok(n) => {
                        let s = match str::from_utf8(buf.as_slice()) {
                            Ok(v) => v,
                            Err(e) => {
                                trace!("Invalid UTF-8 sequence: {}", e);
                                return;
                            }
                        };
                        print!("{}", s);
                        info!("{}", s);
                        // Copy the data back to socket
                        // if socket.write_all(&buf[..n]).await.is_err() {
                        //     // Unexpected socket error. There isn't much we can
                        //     // do here so just stop processing.
                        //     return;
                        // }
                    }
                    Err(_) => {
                        debug!("Something went wrong. Socket Error.");
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
    debug!("Program outside of loop and exiting. Did something go wrong?");
}

好的,代码在let (mut socket, _) = listener.accept().await?;之前都能正常工作。一旦代码到达那里,它就进入了一个黑洞。没有更多的print语句出来。现在,这并不意味着程序失败了。当我telnet进入程序时,它连接正常。我向它发送消息也正常。如果我杀死了rust程序,telnet会说客户端断开连接,但只有当我杀死它时。所以我知道时雄正在接受连接并通过TCP进行交互。它不再与我的代码一起工作。就像在引擎盖下它处理TCP连接,但它忘记通知它返回的Future。
我在其他地方读到过时雄不与fork混合,但我也听说,只要你在Tokio做任何事情之前fork程序,它应该工作得很好,因为它在做任何事情之前就包含在fork中。

//////////////////////////////////////////////////////////////
// Changes made after answer that got the code working
//////////////////////////////////////////////////////////////

fn main() -> Result<(), Box<dyn std::error::Error>> {
let stdout = File::create("/tmp/daemon.out").unwrap();
let stderr = File::create("/tmp/daemon.err").unwrap();

let daemonize = Daemonize::new()
    .pid_file("/tmp/test.pid") // Every method except `new` and `start`
    .chown_pid_file(true) // is optional, see `Daemonize` documentation
    .working_directory("/tmp") // for default behaviour.
    .user("nobody")
    .group("daemon") // Group name
    .group(2) // or group id.
    .umask(0o777) // Set umask, `0o027` by default.
    .stdout(stdout) // Redirect stdout to `/tmp/daemon.out`.
    .stderr(stderr) // Redirect stderr to `/tmp/daemon.err`.
    .privileged_action(|| "Executed before drop privileges");

match daemonize.start() {
    Ok(_) => println!("Success, daemonized"),
    Err(e) => eprintln!("Error, {}", e),
}

tokio::runtime::Builder::new_multi_thread()
    .enable_all()
    .build()
    .unwrap()
    .block_on(async {
        setup_logging(3).expect("failed to initialize logging.");
        main_functionality().await
    })
//debug!("Program outside of loop and exitting. Did something go wrong?");
}

async fn main_functionality() -> Result<(), Box<dyn 
std::error::Error>> {
info!("Server starting...");
let listener = TcpListener::bind("###IP:port###").await?;
info!("TcpListener bound to ###IP:port###");
info!("{:?}", listener.local_addr());
loop {
    println!("Listening for connection.");
    let (mut socket, _) = listener.accept().await?;
    info!("Connection made from client: {:?}", socket.peer_addr());
    println!("Connection made from client: {:?}", socket.peer_addr());

    tokio::spawn(async move {
        let mut buf = vec![0; 1024];

        loop {
            match socket.read(&mut buf).await {
                // Return value of `Ok(0)` signifies that the remote has
                // closed
                Ok(0) => {
                    info!("Client ({:?}) closed the connection", socket.peer_addr());
                    println!("Client ({:?}) closed the connection", socket.peer_addr());
                    return;
                }
                Ok(n) => {
                    let s = match str::from_utf8(buf.as_slice()) {
                        Ok(v) => v,
                        Err(e) => {
                            trace!("Invalid UTF-8 sequence: {}:[{}]", e, n);
                            return;
                        }
                    };
                    print!("{}", s);
                    info!("{}", s);
                    // Copy the data back to socket
                    // if socket.write_all(&buf[..n]).await.is_err() {
                    //     // Unexpected socket error. There isn't much we can
                    //     // do here so just stop processing.
                    //     return;
                    // }
                }
                Err(_) => {
                    debug!("Something went wrong. Socket Error.");
                    // Unexpected socket error. There isn't much we can do
                    // here so just stop processing.
                    return;
                }
            }
        }
    });
}
}

当然,现在这个工作。我不得不怀疑它是否有用这样做。在问这个问题和获得帮助来修复它之间的时间,我想起了像systemd这样的实用程序,为你做这一切。

k10s72fa

k10s72fa1#

#[tokio::main]意味着时雄在您输入main函数之前启动,因此在daemonize有机会fork程序之前→从main函数中删除#[tokio::main],并在daemonize之后调用tokio_main函数来运行Tokio内容:

fn main() -> io::Result<()> {
    let stdout = File::create("/tmp/daemon.out").unwrap();
    let stderr = File::create("/tmp/daemon.err").unwrap();

    let daemonize = Daemonize::new()
        .pid_file("/tmp/test.pid") // Every method except `new` and `start`
        .chown_pid_file(true) // is optional, see `Daemonize` documentation
        .working_directory("/tmp") // for default behaviour.
        .user("nobody")
        .group("daemon") // Group name
        .group(2) // or group id.
        .umask(0o777) // Set umask, `0o027` by default.
        .stdout(stdout) // Redirect stdout to `/tmp/daemon.out`.
        .stderr(stderr) // Redirect stderr to `/tmp/daemon.err`.
        .privileged_action(|| "Executed before drop privileges");

    match daemonize.start() {
        Ok(_) => println!("Success, daemonized"),
        Err(e) => eprintln!("Error, {}", e),
    }

    tokio_main()
}

#[tokio::main]
async fn tokio_main() -> io::Result<()> {
    setup_logging(3).expect("failed to initialize logging.");
    info!("Server starting...");
    let listener = TcpListener::bind("45.33.110.226:2999").await?;
    info!("TcpListener bound to 45.33.110.226:2999");
    info!("{:?}", listener.local_addr());
    loop {
        println!("Listening for connection.");
        let (mut socket, _) = listener.accept().await?;
        info!("Connection made from client: {:?}", socket.peer_addr());
        println!("Connection made from client: {:?}", socket.peer_addr());

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => {
                        info!("Client ({:?}) closed the connection", socket.peer_addr());
                        println!("Client ({:?}) closed the connection", socket.peer_addr());
                        return;
                    }
                    Ok(n) => {
                        let s = match str::from_utf8(buf.as_slice()) {
                            Ok(v) => v,
                            Err(e) => {
                                trace!("Invalid UTF-8 sequence: {}", e);
                                return;
                            }
                        };
                        print!("{}", s);
                        info!("{}", s);
                        // Copy the data back to socket
                        // if socket.write_all(&buf[..n]).await.is_err() {
                        //     // Unexpected socket error. There isn't much we can
                        //     // do here so just stop processing.
                        //     return;
                        // }
                    }
                    Err(_) => {
                        debug!("Something went wrong. Socket Error.");
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
    debug!("Program outside of loop and exiting. Did something go wrong?");
}

相关问题