diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/einhyrningsinsctl.rs | 3 | ||||
-rw-r--r-- | src/main.rs | 120 |
2 files changed, 79 insertions, 44 deletions
diff --git a/src/bin/einhyrningsinsctl.rs b/src/bin/einhyrningsinsctl.rs index a727c2f..03d6d51 100644 --- a/src/bin/einhyrningsinsctl.rs +++ b/src/bin/einhyrningsinsctl.rs @@ -20,13 +20,10 @@ extern crate json; extern crate getopts; -extern crate log; -extern crate env_logger; extern crate nix; extern crate timer; extern crate time; extern crate chan_signal; -extern crate url; extern crate rustyline; use std::io; diff --git a/src/main.rs b/src/main.rs index eb3dc9c..0120599 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,18 +16,16 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#[macro_use] -extern crate chan; +#[macro_use] extern crate chan; +#[macro_use] extern crate slog; +extern crate slog_syslog; +extern crate slog_term; extern crate json; - extern crate getopts; -extern crate log; -extern crate env_logger; extern crate nix; extern crate timer; extern crate time; extern crate chan_signal; -extern crate url; use std::io::prelude::*; use std::io::{BufReader, BufWriter}; @@ -44,13 +42,14 @@ use std::net::TcpListener; use std::net::ToSocketAddrs; use std::os::unix::net::{UnixStream, UnixListener}; use std::thread; +use std::os::unix::io::{RawFd, IntoRawFd}; use time::Duration; use std::collections::HashMap; use getopts::Options; use chan_signal::Signal; use chan::{Sender, Receiver}; -use std::os::unix::io::{RawFd, IntoRawFd}; +use slog::DrainExt; #[derive(Clone, Debug, PartialEq)] @@ -67,6 +66,8 @@ struct EinConfig { ctrl_path: String, bind_slugs: Vec<String>, env_drops: Vec<String>, + verbose: bool, + syslog: bool, } struct EinState { @@ -76,6 +77,7 @@ struct EinState { timer: timer::Timer, timer_tx: Sender<TimerAction>, timer_rx: Receiver<TimerAction>, + log: slog::Logger, } #[derive(Copy, Clone, Debug, PartialEq)] @@ -116,6 +118,7 @@ struct Offspring { attempts: u64, timer_guard: Option<timer::Guard>, replaces: Option<u32>, + log: slog::Logger, } impl Offspring { @@ -127,12 +130,15 @@ impl Offspring { attempts: 0, timer_guard: None, replaces: None, + log: state.log.clone(), }; let pid = o.process.id(); + o.log = state.log.new(o!("child_pid" => pid,)); let t_tx = state.timer_tx.clone(); o.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.childhood, move || { t_tx.send(TimerAction::CheckAlive(pid)); })); + info!(o.log, "spawned"); Ok(o) } @@ -195,7 +201,11 @@ impl Offspring { Signal::USR2 => nix::sys::signal::Signal::SIGUSR2, Signal::STOP => nix::sys::signal::Signal::SIGSTOP, Signal::CONT => nix::sys::signal::Signal::SIGCONT, - _ => { println!("Unexpected signal: {:?}", sig); return; }, + _ => { + warn!(self.log, "tried to send unexpected signal"; + "signal" => format!("{:?}", sig)); + return; + }, }; nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap(); } @@ -210,7 +220,6 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { let o = Offspring::spawn(&mut state).unwrap(); let pid = o.process.id(); brood.insert(pid, o); - println!("Spawned: {}", pid); } // Ugh, see: http://burntsushi.net/rustdoc/chan/macro.chan_select.html#failure-modes @@ -228,7 +237,7 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { if let Some(mut o) = brood.remove(&pid) { if !state.cfg.manual_ack && o.state == OffspringState::Infancy { if o.is_active() { - println!("{} found to be alive", pid); + debug!(o.log, "found to be alive"); o.state = OffspringState::Healthy; if let Some(old_pid) = o.replaces { if let Some(old) = brood.get_mut(&old_pid) { @@ -237,9 +246,11 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { } } } else if state.cfg.manual_ack && o.state == OffspringState::Infancy { - println!("{} didn't check in", pid); + warn!(o.log, "didn't ack in time, not healthy"; + "max_retries" => state.cfg.retries, + "attempts" => o.attempts); if o.attempts + 1 >= state.cfg.retries { - println!("Ran out of retries..."); + warn!(o.log, "ran out of retries"); } else { let mut successor = o.respawn(&mut state).unwrap(); successor.attempts = o.attempts + 1; @@ -247,7 +258,8 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { } o.terminate(&mut state); } else { - println!("Unexpected CheckAlive state! pid={} state={:?}", o.process.id(), o.state); + warn!(o.log, "Unexpected CheckAlive state!"; + "state" => format!("{:?}", o.state)); } brood.insert(pid, o); }; @@ -346,11 +358,12 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { match res { Ok(nix::sys::wait::WaitStatus::Exited(pid, _)) | Ok(nix::sys::wait::WaitStatus::Signaled(pid, _, _)) => { - println!("PID {} exited", pid); + info!(state.log, "child exited"; "child_pid" => pid); if let Some(mut o) = brood.remove(&(pid as u32)) { match o.state { OffspringState::Infancy => { if o.attempts + 1 >= state.cfg.retries { - println!("Ran out of retries..."); + warn!(state.log, "ran out of retries while spawning"; + "child_pid" => pid); } else { let mut successor = o.respawn(&mut state).unwrap(); successor.attempts = o.attempts + 1; @@ -364,21 +377,24 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { }, OffspringState::Notified => (), OffspringState::Dead => { - println!("ERR: double-notified death on {}", pid); + error!(state.log, "double-notified death"; + "child_pid" => pid); } } }; }, Ok(nix::sys::wait::WaitStatus::StillAlive) => break, Ok(_) => { - println!("Some other thing we don't care about happened: {:?}", res); + info!(state.log, "SIGCHLD we don't care about"; + "value" => format!("{:?}", res)); }, Err(nix::Error::Sys(nix::Errno::ECHILD)) => { - println!("all children are dead, bailing"); + warn!(state.log, "all children are dead, bailing"); run = false; break; }, Err(e) => { - println!("waitpid err: {}", e); + error!(state.log, "waitpid error"; + "err" => format!("{:?}", e)); break; }, } @@ -398,39 +414,45 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { brood.insert(successor.process.id(), successor); } }, Signal::TTIN | Signal::TTOU | Signal::USR1 | Signal::STOP | Signal::CONT => { - println!("Passing signal to children: {:?}", sig.unwrap()); + let sig = sig.unwrap(); + info!(state.log, "passing signal to children"; + "signal" => format!("{:?}", sig)); for (_, o) in brood.iter_mut() { - o.signal(sig.unwrap()); + o.signal(sig); } }, Signal::INT | Signal::USR2 => { - println!("Exiting! Gracefully shutting down children first, but won't wait."); + info!(state.log, + "Exiting! Gracefully shutting down children first, but won't wait"); for (_, o) in brood.iter_mut() { o.shutdown(&mut state); } run = false; }, Signal::TERM | Signal::QUIT => { - println!("Exiting! Killing children first, but won't wait."); + info!(state.log, + "Exiting! Killing children first, but won't wait."); for (_, o) in brood.iter_mut() { o.terminate(&mut state); } run = false; }, default => { - println!("Unexpected signal: {:?} (ignoring)", default); + info!(state.log, "Unexpected signal (ignoring)"; + "signal" => format!("{:?}", default)); }, }, } if !run { break; } } - println!("Reaping children... (count={})", brood.len()); + info!(state.log, "reaping children"; + "count" => brood.len()); for (pid, o) in brood.iter() { if o.is_active() { nix::sys::wait::waitpid(*pid as i32, Some(nix::sys::wait::WNOHANG)).ok(); } } - println!("Done."); + info!(state.log, "done, exiting"); } /* * * * * * * * Setup and CLI * * * * * * * */ @@ -450,6 +472,7 @@ fn main() { opts.optflag("h", "help", "print this help menu"); opts.optflag("", "version", "print the version"); opts.optflag("v", "verbose", "more debugging messages"); + opts.optflag("", "syslog", "enables syslog-ing (for WARN and above)"); opts.optflag("4", "ipv4-only", "only accept IPv4 connections"); opts.optflag("6", "ipv6-only", "only accept IPv6 connections"); opts.optflag("m", "manual", "manual (explicit) acknowledge mode"); @@ -497,6 +520,8 @@ fn main() { let ipv4_only = matches.opt_present("4"); let ipv6_only = matches.opt_present("6"); let manual_ack = matches.opt_present("m"); + let verbose = matches.opt_present("verbose"); + let syslog = matches.opt_present("syslog"); let program_and_args = if !matches.free.is_empty() { matches.free @@ -518,6 +543,8 @@ fn main() { ctrl_path: path_str, bind_slugs: bind_slugs, env_drops: env_drops, + verbose: verbose, + syslog: syslog, }; // Control socket first; not same scope as other state @@ -556,7 +583,8 @@ fn main() { }; //// Start Constrol Socket Thread - thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx)); + let ctrl_log = state.log.clone(); + thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx, ctrl_log)); //// State Event Loop shepard(state, signal_rx); @@ -566,6 +594,18 @@ fn main() { // Initializes config into state fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, String> { + //// Configure logging + let term_drain = slog::level_filter( + if cfg.verbose { slog::Level::Debug } else { slog::Level::Info }, + slog_term::streamer().async().auto_color().compact().build()); + let syslog_drain = slog::level_filter( + slog::Level::Warning, + slog_syslog::unix_3164(slog_syslog::Facility::LOG_DAEMON)); + // XXX: cfg.syslog + let log_root = slog::Logger::root( + slog::duplicate(term_drain, syslog_drain).ignore_err(), + o!("version" => env!("CARGO_PKG_VERSION"))); + // These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK) let sock_confs: Vec<(SocketAddr, bool, bool)> = cfg.bind_slugs.iter().map(|b| { let mut r = false; @@ -595,14 +635,6 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, (sock, r, n) }).collect(); - // Configure logging - let mut builder = env_logger::LogBuilder::new(); - builder.parse("INFO"); - if env::var("RUST_LOG").is_ok() { - builder.parse(&env::var("RUST_LOG").unwrap()); - } - builder.init().unwrap(); - let binds: Vec<(TcpListener, bool, bool)> = sock_confs.iter().map(|t| { let sa = t.0; let r = t.1; let n = t.2; // ugly (TcpListener::bind(sa).unwrap(), r, n) @@ -625,7 +657,9 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, if n { nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::O_NONBLOCK)).unwrap(); } - println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap()); + debug!(log_root, "bound socket"; + "fd" => fd, + "FD_CLOEXEC" => nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap()); fd }).collect(); @@ -646,6 +680,7 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, timer: timer, timer_tx: timer_tx, timer_rx: timer_rx, + log: log_root, }) } @@ -664,13 +699,13 @@ const CTRL_SHELL_USAGE: &'static str = r#"Command Listing: version prints (master) version "#; -fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>) { +fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log: slog::Logger) { let reader = BufReader::new(&stream); let mut writer = BufWriter::new(&stream); for rawline in reader.lines() { let rawline = rawline.unwrap(); - println!("Got line: {}", rawline); + debug!(log, "got raw command"; "line" => rawline); if rawline.len() == 0 { continue; } @@ -753,16 +788,19 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>) { stream.shutdown(std::net::Shutdown::Both).unwrap(); } -fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender<CtrlRequest>) { +fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender<CtrlRequest>, log: slog::Logger) { for conn in listener.incoming() { match conn{ Ok(conn) => { let tx = ctrl_req_tx.clone(); - thread::spawn(move || ctrl_socket_handle(conn, tx)); + let conn_log = log.new(o!( + "client" => format!("{:?}", conn))); + info!(conn_log, "accepted connection"); + thread::spawn(move || ctrl_socket_handle(conn, tx, conn_log)); }, Err(err) => { // TODO - println!("control socket err: {}", err); + error!(log, "control socket err: {}", err); break; }, } |