aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs292
1 files changed, 172 insertions, 120 deletions
diff --git a/src/main.rs b/src/main.rs
index 6f88878..2e85ab5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -52,18 +52,53 @@ use chan_signal::Signal;
use chan::{Sender, Receiver};
use std::os::unix::io::{RawFd, IntoRawFd};
-// TODO: split this into read-only config and mutable state
+
+#[derive(Clone, Debug, PartialEq)]
struct EinConfig {
+ program: String,
+ program_args: Vec<String>,
+ count: u64,
childhood: Duration,
graceperiod: Duration,
- manual_ack: bool,
retries: u64,
- count: u64,
- bind_fds: Vec<RawFd>,
- cmd: Command,
ipv4_only: bool,
ipv6_only: bool,
+ manual_ack: bool,
+ ctrl_path: String,
+ bind_slugs: Vec<String>,
+}
+
+struct EinState {
+ cmd: Command,
ctrl_req_rx: Receiver<CtrlRequest>,
+ cfg: EinConfig,
+ timer: timer::Timer,
+ timer_tx: Sender<TimerAction>,
+ timer_rx: Receiver<TimerAction>,
+}
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+enum TimerAction {
+ CheckAlive(u32),
+ CheckTerminated(u32),
+ CheckShutdown(u32),
+}
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+enum CtrlAction {
+ Increment,
+ Decrement,
+ ManualAck(u32),
+ SigAll(Signal),
+ ShutdownAll,
+ UpgradeAll,
+ Status,
+}
+
+#[derive(Clone, Debug, PartialEq)]
+struct CtrlRequest {
+ action: CtrlAction,
+ tx: Sender<String>,
}
#[derive(Copy, Clone, Debug, PartialEq)]
@@ -84,23 +119,24 @@ struct Offspring {
impl Offspring {
- pub fn spawn(cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) -> Result<Offspring, String> {
+ pub fn spawn(state: &mut EinState) -> Result<Offspring, String> {
let mut o = Offspring {
state: OffspringState::Infancy,
- process: cfg.cmd.spawn().expect("error spawning"),
+ process: state.cmd.spawn().expect("error spawning"),
attempts: 0,
timer_guard: None,
replaces: None,
};
let pid = o.process.id();
- o.timer_guard = Some(timer.schedule_with_delay(cfg.childhood, move || {
+ let t_tx = state.timer_tx.clone();
+ o.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.childhood, move || {
t_tx.send(TimerAction::CheckAlive(pid));
}));
Ok(o)
}
- pub fn respawn(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) -> Result<Offspring, String> {
- let mut successor = try!(Offspring::spawn(cfg, timer, t_tx));
+ pub fn respawn(&mut self, state: &mut EinState) -> Result<Offspring, String> {
+ let mut successor = try!(Offspring::spawn(state));
successor.replaces = Some(self.process.id());
Ok(successor)
}
@@ -120,22 +156,24 @@ impl Offspring {
self.state = OffspringState::Dead;
}
- pub fn terminate(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) {
+ pub fn terminate(&mut self, state: &mut EinState) {
if !self.is_active() { return; }
self.signal(Signal::TERM);
self.state = OffspringState::Notified;
let pid = self.process.id();
- self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod, move || {
+ let t_tx = state.timer_tx.clone();
+ self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod, move || {
t_tx.send(TimerAction::CheckTerminated(pid));
}));
}
- pub fn shutdown(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) {
+ pub fn shutdown(&mut self, state: &mut EinState) {
if !self.is_active() { return; }
self.signal(Signal::USR2);
self.state = OffspringState::Notified;
let pid = self.process.id();
- self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod , move || {
+ let t_tx = state.timer_tx.clone();
+ self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod , move || {
t_tx.send(TimerAction::CheckShutdown(pid));
}));
}
@@ -162,48 +200,21 @@ impl Offspring {
}
}
-#[derive(Copy, Clone, Debug, PartialEq)]
-enum TimerAction {
- CheckAlive(u32),
- CheckTerminated(u32),
- CheckShutdown(u32),
-}
-
-#[derive(Copy, Clone, Debug, PartialEq)]
-enum CtrlAction {
- Increment,
- Decrement,
- ManualAck(u32),
- SigAll(Signal),
- ShutdownAll,
- UpgradeAll,
- Status,
-}
-
-#[derive(Clone, Debug, PartialEq)]
-struct CtrlRequest {
- action: CtrlAction,
- tx: Sender<String>,
-}
-
-// This is the main event loop
-fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
-
- //// create timer
- let mut timer = timer::Timer::new();
- let (timer_tx, timer_rx): (Sender<TimerAction>, Receiver<TimerAction>) = chan::async();
+/* * * * * * * * Main Event Loop * * * * * * * */
+fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
//// birth the initial set of offspring
let mut brood: HashMap<u32, Offspring> = HashMap::new();
- for _ in 0..cfg.count {
- let o = Offspring::spawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap();
+ for _ in 0..state.cfg.count {
+ let o = Offspring::spawn(&mut state).unwrap();
let pid = o.process.id();
brood.insert(pid, o);
println!("Spawned: {}", pid);
}
// Ugh, see: http://burntsushi.net/rustdoc/chan/macro.chan_select.html#failure-modes
- let ctrl_req_rx = cfg.ctrl_req_rx.clone();
+ let ctrl_req_rx = state.ctrl_req_rx.clone();
+ let timer_rx = state.timer_rx.clone();
//// infinite select() loop over timers, signals
let mut run = true;
@@ -214,26 +225,26 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
// Need to move 'o' out of HashMap here so we can mutate
// the map in other ways
if let Some(mut o) = brood.remove(&pid) {
- if !cfg.manual_ack && o.state == OffspringState::Infancy {
+ if !state.cfg.manual_ack && o.state == OffspringState::Infancy {
if o.is_active() {
println!("{} found to be alive", pid);
o.state = OffspringState::Healthy;
if let Some(old_pid) = o.replaces {
if let Some(old) = brood.get_mut(&old_pid) {
- old.shutdown(&mut cfg, &mut timer, timer_tx.clone());
+ old.shutdown(&mut state);
}
}
}
- } else if cfg.manual_ack && o.state == OffspringState::Infancy {
+ } else if state.cfg.manual_ack && o.state == OffspringState::Infancy {
println!("{} didn't check in", pid);
- if o.attempts + 1 >= cfg.retries {
+ if o.attempts + 1 >= state.cfg.retries {
println!("Ran out of retries...");
} else {
- let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap();
+ let mut successor = o.respawn(&mut state).unwrap();
successor.attempts = o.attempts + 1;
brood.insert(successor.process.id(), successor);
}
- o.terminate(&mut cfg, &mut timer, timer_tx.clone());
+ o.terminate(&mut state);
} else {
println!("Unexpected CheckAlive state! pid={} state={:?}", o.process.id(), o.state);
}
@@ -243,7 +254,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
TimerAction::CheckShutdown(pid) => {
if let Some(o) = brood.get_mut(&pid) {
if o.is_active() {
- o.terminate(&mut cfg, &mut timer, timer_tx.clone());
+ o.terminate(&mut state);
}
}
},
@@ -258,24 +269,25 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
ctrl_req_rx.recv() -> maybe_req =>
if let Some(req) = maybe_req { match req.action {
CtrlAction::Increment => {
- let o = Offspring::spawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap();
+ let o = Offspring::spawn(&mut state).unwrap();
let pid = o.process.id();
brood.insert(pid, o);
- req.tx.send(format!("Spawned! Went from {} to {}", cfg.count, cfg.count+1));
- cfg.count = cfg.count+1;
+ req.tx.send(format!("Spawned! Went from {} to {}", state.cfg.count, state.cfg.count+1));
+ state.cfg.count = state.cfg.count+1;
},
CtrlAction::Decrement => {
- if cfg.count <= 0 {
+ if state.cfg.count <= 0 {
req.tx.send(format!("Already at count=0, no-op"));
continue;
}
let mut done = false;
for (_, o) in brood.iter_mut() {
if o.is_active() {
- o.shutdown(&mut cfg, &mut timer, timer_tx.clone());
- req.tx.send(format!("Notified! Went from {} to {}", cfg.count, cfg.count-1));
- cfg.count = cfg.count-1;
+ o.shutdown(&mut state);
+ req.tx.send(format!("Notified! Went from {} to {}", state.cfg.count, state.cfg.count-1));
+ state.cfg.count = state.cfg.count-1;
done = true;
+ break;
}
}
if !done {
@@ -292,7 +304,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
let mut pid_list = vec![];
for (pid, o) in brood.iter_mut() {
if o.is_active() {
- o.shutdown(&mut cfg, &mut timer, timer_tx.clone());
+ o.shutdown(&mut state);
pid_list.push(pid);
}
}
@@ -306,7 +318,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
if !o.is_active() {
continue;
}
- o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap()
+ o.respawn(&mut state).unwrap()
};
successor.attempts = 0;
brood.insert(successor.process.id(), successor);
@@ -336,16 +348,16 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
println!("PID {} exited", pid);
if let Some(mut o) = brood.remove(&(pid as u32)) { match o.state {
OffspringState::Infancy => {
- if o.attempts + 1 >= cfg.retries {
+ if o.attempts + 1 >= state.cfg.retries {
println!("Ran out of retries...");
} else {
- let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap();
+ let mut successor = o.respawn(&mut state).unwrap();
successor.attempts = o.attempts + 1;
brood.insert(successor.process.id(), successor);
}
},
OffspringState::Healthy => {
- let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap();
+ let mut successor = o.respawn(&mut state).unwrap();
successor.replaces = Some(pid as u32);
brood.insert(successor.process.id(), successor);
},
@@ -379,7 +391,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
if !o.is_active() {
continue;
}
- o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap()
+ o.respawn(&mut state).unwrap()
};
successor.attempts = 0;
brood.insert(successor.process.id(), successor);
@@ -392,14 +404,14 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
Signal::INT | Signal::USR2 => {
println!("Exiting! Gracefully shutting down children first, but won't wait.");
for (_, o) in brood.iter_mut() {
- o.shutdown(&mut cfg, &mut timer, timer_tx.clone());
+ o.shutdown(&mut state);
}
run = false;
},
Signal::TERM | Signal::QUIT => {
println!("Exiting! Killing children first, but won't wait.");
for (_, o) in brood.iter_mut() {
- o.terminate(&mut cfg, &mut timer, timer_tx.clone());
+ o.terminate(&mut state);
}
run = false;
},
@@ -418,8 +430,10 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
println!("Done.");
}
+/* * * * * * * * Setup and CLI * * * * * * * */
+
fn print_usage(opts: Options) {
- let brief = "usage:\teinhyrningsins [options] program";
+ let brief = "usage:\teinhyrningsins [options] [--] program [program_args]";
println!("");
print!("{}", opts.usage(&brief));
}
@@ -438,10 +452,12 @@ fn main() {
opts.optflag("m", "manual", "manual (explicit) acknowledge mode");
opts.optopt("n", "number", "how many program copies to spawn", "COUNT");
opts.optmulti("b", "bind", "socket(s) to bind to", "ADDR");
+ opts.optopt("d", "socket-path", "where to look for control socket (default: /tmp/einhorn.sock)", "PATH");
+ opts.optopt("r", "retries", "how many times to attempt spawning", "RETRIES");
let matches = match opts.parse(&args[1..]) {
Ok(m) => { m }
- Err(f) => { println!("{}", f.to_string()); print_usage(opts); exit(-1); }
+ Err(f) => { println!("{}\n", f.to_string()); print_usage(opts); exit(-1); }
};
if matches.opt_present("help") {
@@ -460,27 +476,22 @@ fn main() {
}
//// Parse Configuration
- let (ctrl_req_tx, ctrl_req_rx): (Sender<CtrlRequest>, Receiver<CtrlRequest>) = chan::async();
- let mut cfg = EinConfig{
- count: 1,
- childhood: Duration::seconds(3),
- graceperiod: Duration::seconds(3),
- retries: 3,
- bind_fds: vec![],
- ipv4_only: matches.opt_present("4"),
- ipv6_only: matches.opt_present("6"),
- manual_ack: matches.opt_present("m"),
- cmd: Command::new(""),
- ctrl_req_rx: ctrl_req_rx,
- };
-
let path_str = matches.opt_str("socket-path").unwrap_or("/tmp/einhorn.sock".to_string());
- if let Some(n) = matches.opt_str("number") {
- cfg.count = u64::from_str(&n).expect("number arg should be an integer");
- }
+ let count = match matches.opt_str("number") {
+ Some(n) => u64::from_str(&n).expect("number arg should be an integer"),
+ None => 1,
+ };
+
+ let retries = match matches.opt_str("retries") {
+ Some(n) => u64::from_str(&n).expect("retries arg should be an integer"),
+ None => 1,
+ };
let bind_slugs = matches.opt_strs("bind");
+ let ipv4_only = matches.opt_present("4");
+ let ipv6_only = matches.opt_present("6");
+ let manual_ack = matches.opt_present("m");
let program_and_args = if !matches.free.is_empty() {
matches.free
@@ -488,25 +499,73 @@ fn main() {
println!("Missing program to run (try --help)");
exit(-1);
};
+ let mut program_and_args = program_and_args.into_iter();
+ let cfg = EinConfig {
+ program: program_and_args.next().unwrap(),
+ program_args: program_and_args.collect(),
+ count: count,
+ childhood: Duration::seconds(3),
+ graceperiod: Duration::seconds(3),
+ retries: retries,
+ ipv4_only: ipv4_only,
+ ipv6_only: ipv6_only,
+ manual_ack: manual_ack,
+ ctrl_path: path_str,
+ bind_slugs: bind_slugs,
+ };
- //// Bind Sockets
-
- // Control socket first
+ // Control socket first; not same scope as other state
// XXX: handle this more gracefully (per-process)
- let ctrl_path = Path::new(&path_str);
+ let tmp = cfg.ctrl_path.clone();
+ let ctrl_path = Path::new(&tmp);
if ctrl_path.exists() {
fs::remove_file(&ctrl_path).unwrap();
}
+
println!("Binding control socket to: {:?}", ctrl_path);
let ctrl_listener = UnixListener::bind(ctrl_path).unwrap();
// XXX: set mode/permissions/owner?
+ let (ctrl_req_tx, ctrl_req_rx): (Sender<CtrlRequest>, Receiver<CtrlRequest>) = chan::async();
+
+ //// Listen for signals (before any fork())
+ println!("Registering signal handlers...");
+ let signal_rx = chan_signal::notify(&[Signal::HUP,
+ Signal::INT,
+ Signal::QUIT,
+ Signal::TERM,
+ Signal::PIPE,
+ Signal::ALRM,
+ Signal::CHLD, // NB: PR has been submitted
+ Signal::TTIN,
+ Signal::TTOU,
+ Signal::USR1,
+ Signal::USR2,
+ Signal::STOP,
+ Signal::CONT]);
+
+ let state = match init(cfg, ctrl_req_rx) {
+ Ok(s) => s,
+ Err(e) => { println!("{}", e); exit(-1); },
+ };
+
+ //// Start Constrol Socket Thread
+ thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx));
+
+ //// State Event Loop
+ shepard(state, signal_rx);
+ exit(0);
+}
+
+// Initializes config into state
+fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, String> {
+
// These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK)
- let sock_confs: Vec<(SocketAddr, bool, bool)> = bind_slugs.iter().map(|b| {
+ let sock_confs: Vec<(SocketAddr, bool, bool)> = cfg.bind_slugs.iter().map(|b| {
let mut r = false;
let mut n = false;
let mut addr_chunks = b.split(',');
- let sock_str = addr_chunks.next().unwrap();
+ let sock_str = addr_chunks.next().unwrap(); // safe
let mut sock_addrs = sock_str.to_socket_addrs().unwrap();
// ugly
let sock = if cfg.ipv4_only {
@@ -530,6 +589,7 @@ fn main() {
(sock, r, n)
}).collect();
+ // Configure logging
let mut builder = env_logger::LogBuilder::new();
builder.parse("INFO");
if env::var("RUST_LOG").is_ok() {
@@ -542,10 +602,10 @@ fn main() {
(TcpListener::bind(sa).unwrap(), r, n)
}).collect();
- let mut cmd = Command::new(&program_and_args[0]);
- cmd.args(&program_and_args[1..]);
+ let mut cmd = Command::new(cfg.program.clone());
+ cmd.args(&cfg.program_args);
- cfg.bind_fds = binds.into_iter().map(|t| {
+ let bind_fds: Vec<RawFd> = binds.into_iter().map(|t| {
let b = t.0; let r = t.1; let n = t.2; // ugly
let orig_fd = b.into_raw_fd();
// Duplicate, which also clears the CLOEXEC flag
@@ -560,36 +620,28 @@ fn main() {
fd
}).collect();
- cmd.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string());
+ cmd.env("EINHORN_FD_COUNT", bind_fds.len().to_string());
// This iterator destroys the TcpListeners
- for (i, fd) in cfg.bind_fds.iter().enumerate() {
+ for (i, fd) in bind_fds.iter().enumerate() {
cmd.env(format!("EINHORN_FD_{}", i), fd.to_string());
}
- cfg.cmd = cmd;
- //// Listen for signals (before any fork())
- println!("Registering signal handlers...");
- let signal_rx = chan_signal::notify(&[Signal::HUP,
- Signal::INT,
- Signal::QUIT,
- Signal::TERM,
- Signal::PIPE,
- Signal::ALRM,
- Signal::CHLD, // NB: PR has been submitted
- Signal::TTIN,
- Signal::TTOU,
- Signal::USR1,
- Signal::USR2,
- Signal::STOP,
- Signal::CONT]);
-
- //// Start Constrol Socket Thread
- thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx));
+ // create timer thread
+ let timer = timer::Timer::new();
+ let (timer_tx, timer_rx): (Sender<TimerAction>, Receiver<TimerAction>) = chan::async();
- shepard(cfg, signal_rx);
- exit(0);
+ Ok(EinState {
+ cmd: cmd,
+ ctrl_req_rx: ctrl_req_rx,
+ cfg: cfg,
+ timer: timer,
+ timer_tx: timer_tx,
+ timer_rx: timer_rx,
+ })
}
+/* * * * * * * * Control Socket Server * * * * * * * */
+
const CTRL_SHELL_USAGE: &'static str = r#"\"Command Listing:
inc increments number of children