aboutsummaryrefslogtreecommitdiffstats
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs120
1 files changed, 79 insertions, 41 deletions
diff --git a/src/main.rs b/src/main.rs
index eb3dc9c..0120599 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,18 +16,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#[macro_use]
-extern crate chan;
+#[macro_use] extern crate chan;
+#[macro_use] extern crate slog;
+extern crate slog_syslog;
+extern crate slog_term;
extern crate json;
-
extern crate getopts;
-extern crate log;
-extern crate env_logger;
extern crate nix;
extern crate timer;
extern crate time;
extern crate chan_signal;
-extern crate url;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter};
@@ -44,13 +42,14 @@ use std::net::TcpListener;
use std::net::ToSocketAddrs;
use std::os::unix::net::{UnixStream, UnixListener};
use std::thread;
+use std::os::unix::io::{RawFd, IntoRawFd};
use time::Duration;
use std::collections::HashMap;
use getopts::Options;
use chan_signal::Signal;
use chan::{Sender, Receiver};
-use std::os::unix::io::{RawFd, IntoRawFd};
+use slog::DrainExt;
#[derive(Clone, Debug, PartialEq)]
@@ -67,6 +66,8 @@ struct EinConfig {
ctrl_path: String,
bind_slugs: Vec<String>,
env_drops: Vec<String>,
+ verbose: bool,
+ syslog: bool,
}
struct EinState {
@@ -76,6 +77,7 @@ struct EinState {
timer: timer::Timer,
timer_tx: Sender<TimerAction>,
timer_rx: Receiver<TimerAction>,
+ log: slog::Logger,
}
#[derive(Copy, Clone, Debug, PartialEq)]
@@ -116,6 +118,7 @@ struct Offspring {
attempts: u64,
timer_guard: Option<timer::Guard>,
replaces: Option<u32>,
+ log: slog::Logger,
}
impl Offspring {
@@ -127,12 +130,15 @@ impl Offspring {
attempts: 0,
timer_guard: None,
replaces: None,
+ log: state.log.clone(),
};
let pid = o.process.id();
+ o.log = state.log.new(o!("child_pid" => pid,));
let t_tx = state.timer_tx.clone();
o.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.childhood, move || {
t_tx.send(TimerAction::CheckAlive(pid));
}));
+ info!(o.log, "spawned");
Ok(o)
}
@@ -195,7 +201,11 @@ impl Offspring {
Signal::USR2 => nix::sys::signal::Signal::SIGUSR2,
Signal::STOP => nix::sys::signal::Signal::SIGSTOP,
Signal::CONT => nix::sys::signal::Signal::SIGCONT,
- _ => { println!("Unexpected signal: {:?}", sig); return; },
+ _ => {
+ warn!(self.log, "tried to send unexpected signal";
+ "signal" => format!("{:?}", sig));
+ return;
+ },
};
nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap();
}
@@ -210,7 +220,6 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
let o = Offspring::spawn(&mut state).unwrap();
let pid = o.process.id();
brood.insert(pid, o);
- println!("Spawned: {}", pid);
}
// Ugh, see: http://burntsushi.net/rustdoc/chan/macro.chan_select.html#failure-modes
@@ -228,7 +237,7 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
if let Some(mut o) = brood.remove(&pid) {
if !state.cfg.manual_ack && o.state == OffspringState::Infancy {
if o.is_active() {
- println!("{} found to be alive", pid);
+ debug!(o.log, "found to be alive");
o.state = OffspringState::Healthy;
if let Some(old_pid) = o.replaces {
if let Some(old) = brood.get_mut(&old_pid) {
@@ -237,9 +246,11 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
}
}
} else if state.cfg.manual_ack && o.state == OffspringState::Infancy {
- println!("{} didn't check in", pid);
+ warn!(o.log, "didn't ack in time, not healthy";
+ "max_retries" => state.cfg.retries,
+ "attempts" => o.attempts);
if o.attempts + 1 >= state.cfg.retries {
- println!("Ran out of retries...");
+ warn!(o.log, "ran out of retries");
} else {
let mut successor = o.respawn(&mut state).unwrap();
successor.attempts = o.attempts + 1;
@@ -247,7 +258,8 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
}
o.terminate(&mut state);
} else {
- println!("Unexpected CheckAlive state! pid={} state={:?}", o.process.id(), o.state);
+ warn!(o.log, "Unexpected CheckAlive state!";
+ "state" => format!("{:?}", o.state));
}
brood.insert(pid, o);
};
@@ -346,11 +358,12 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
match res {
Ok(nix::sys::wait::WaitStatus::Exited(pid, _)) |
Ok(nix::sys::wait::WaitStatus::Signaled(pid, _, _)) => {
- println!("PID {} exited", pid);
+ info!(state.log, "child exited"; "child_pid" => pid);
if let Some(mut o) = brood.remove(&(pid as u32)) { match o.state {
OffspringState::Infancy => {
if o.attempts + 1 >= state.cfg.retries {
- println!("Ran out of retries...");
+ warn!(state.log, "ran out of retries while spawning";
+ "child_pid" => pid);
} else {
let mut successor = o.respawn(&mut state).unwrap();
successor.attempts = o.attempts + 1;
@@ -364,21 +377,24 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
},
OffspringState::Notified => (),
OffspringState::Dead => {
- println!("ERR: double-notified death on {}", pid);
+ error!(state.log, "double-notified death";
+ "child_pid" => pid);
}
} };
},
Ok(nix::sys::wait::WaitStatus::StillAlive) => break,
Ok(_) => {
- println!("Some other thing we don't care about happened: {:?}", res);
+ info!(state.log, "SIGCHLD we don't care about";
+ "value" => format!("{:?}", res));
},
Err(nix::Error::Sys(nix::Errno::ECHILD)) => {
- println!("all children are dead, bailing");
+ warn!(state.log, "all children are dead, bailing");
run = false;
break;
},
Err(e) => {
- println!("waitpid err: {}", e);
+ error!(state.log, "waitpid error";
+ "err" => format!("{:?}", e));
break;
},
}
@@ -398,39 +414,45 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
brood.insert(successor.process.id(), successor);
} },
Signal::TTIN | Signal::TTOU | Signal::USR1 | Signal::STOP | Signal::CONT => {
- println!("Passing signal to children: {:?}", sig.unwrap());
+ let sig = sig.unwrap();
+ info!(state.log, "passing signal to children";
+ "signal" => format!("{:?}", sig));
for (_, o) in brood.iter_mut() {
- o.signal(sig.unwrap());
+ o.signal(sig);
} },
Signal::INT | Signal::USR2 => {
- println!("Exiting! Gracefully shutting down children first, but won't wait.");
+ info!(state.log,
+ "Exiting! Gracefully shutting down children first, but won't wait");
for (_, o) in brood.iter_mut() {
o.shutdown(&mut state);
}
run = false;
},
Signal::TERM | Signal::QUIT => {
- println!("Exiting! Killing children first, but won't wait.");
+ info!(state.log,
+ "Exiting! Killing children first, but won't wait.");
for (_, o) in brood.iter_mut() {
o.terminate(&mut state);
}
run = false;
},
default => {
- println!("Unexpected signal: {:?} (ignoring)", default);
+ info!(state.log, "Unexpected signal (ignoring)";
+ "signal" => format!("{:?}", default));
},
},
}
if !run { break; }
}
- println!("Reaping children... (count={})", brood.len());
+ info!(state.log, "reaping children";
+ "count" => brood.len());
for (pid, o) in brood.iter() {
if o.is_active() {
nix::sys::wait::waitpid(*pid as i32, Some(nix::sys::wait::WNOHANG)).ok();
}
}
- println!("Done.");
+ info!(state.log, "done, exiting");
}
/* * * * * * * * Setup and CLI * * * * * * * */
@@ -450,6 +472,7 @@ fn main() {
opts.optflag("h", "help", "print this help menu");
opts.optflag("", "version", "print the version");
opts.optflag("v", "verbose", "more debugging messages");
+ opts.optflag("", "syslog", "enables syslog-ing (for WARN and above)");
opts.optflag("4", "ipv4-only", "only accept IPv4 connections");
opts.optflag("6", "ipv6-only", "only accept IPv6 connections");
opts.optflag("m", "manual", "manual (explicit) acknowledge mode");
@@ -497,6 +520,8 @@ fn main() {
let ipv4_only = matches.opt_present("4");
let ipv6_only = matches.opt_present("6");
let manual_ack = matches.opt_present("m");
+ let verbose = matches.opt_present("verbose");
+ let syslog = matches.opt_present("syslog");
let program_and_args = if !matches.free.is_empty() {
matches.free
@@ -518,6 +543,8 @@ fn main() {
ctrl_path: path_str,
bind_slugs: bind_slugs,
env_drops: env_drops,
+ verbose: verbose,
+ syslog: syslog,
};
// Control socket first; not same scope as other state
@@ -556,7 +583,8 @@ fn main() {
};
//// Start Constrol Socket Thread
- thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx));
+ let ctrl_log = state.log.clone();
+ thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx, ctrl_log));
//// State Event Loop
shepard(state, signal_rx);
@@ -566,6 +594,18 @@ fn main() {
// Initializes config into state
fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, String> {
+ //// Configure logging
+ let term_drain = slog::level_filter(
+ if cfg.verbose { slog::Level::Debug } else { slog::Level::Info },
+ slog_term::streamer().async().auto_color().compact().build());
+ let syslog_drain = slog::level_filter(
+ slog::Level::Warning,
+ slog_syslog::unix_3164(slog_syslog::Facility::LOG_DAEMON));
+ // XXX: cfg.syslog
+ let log_root = slog::Logger::root(
+ slog::duplicate(term_drain, syslog_drain).ignore_err(),
+ o!("version" => env!("CARGO_PKG_VERSION")));
+
// These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK)
let sock_confs: Vec<(SocketAddr, bool, bool)> = cfg.bind_slugs.iter().map(|b| {
let mut r = false;
@@ -595,14 +635,6 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState,
(sock, r, n)
}).collect();
- // Configure logging
- let mut builder = env_logger::LogBuilder::new();
- builder.parse("INFO");
- if env::var("RUST_LOG").is_ok() {
- builder.parse(&env::var("RUST_LOG").unwrap());
- }
- builder.init().unwrap();
-
let binds: Vec<(TcpListener, bool, bool)> = sock_confs.iter().map(|t| {
let sa = t.0; let r = t.1; let n = t.2; // ugly
(TcpListener::bind(sa).unwrap(), r, n)
@@ -625,7 +657,9 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState,
if n {
nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::O_NONBLOCK)).unwrap();
}
- println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap());
+ debug!(log_root, "bound socket";
+ "fd" => fd,
+ "FD_CLOEXEC" => nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap());
fd
}).collect();
@@ -646,6 +680,7 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState,
timer: timer,
timer_tx: timer_tx,
timer_rx: timer_rx,
+ log: log_root,
})
}
@@ -664,13 +699,13 @@ const CTRL_SHELL_USAGE: &'static str = r#"Command Listing:
version prints (master) version
"#;
-fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>) {
+fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log: slog::Logger) {
let reader = BufReader::new(&stream);
let mut writer = BufWriter::new(&stream);
for rawline in reader.lines() {
let rawline = rawline.unwrap();
- println!("Got line: {}", rawline);
+ debug!(log, "got raw command"; "line" => rawline);
if rawline.len() == 0 {
continue;
}
@@ -753,16 +788,19 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>) {
stream.shutdown(std::net::Shutdown::Both).unwrap();
}
-fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender<CtrlRequest>) {
+fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender<CtrlRequest>, log: slog::Logger) {
for conn in listener.incoming() {
match conn{
Ok(conn) => {
let tx = ctrl_req_tx.clone();
- thread::spawn(move || ctrl_socket_handle(conn, tx));
+ let conn_log = log.new(o!(
+ "client" => format!("{:?}", conn)));
+ info!(conn_log, "accepted connection");
+ thread::spawn(move || ctrl_socket_handle(conn, tx, conn_log));
},
Err(err) => {
// TODO
- println!("control socket err: {}", err);
+ error!(log, "control socket err: {}", err);
break;
},
}