From f24d5897071e8792402fcb2320fb4f1c1ec128ff Mon Sep 17 00:00:00 2001 From: bnewbold Date: Wed, 12 Oct 2016 22:32:16 -0700 Subject: yooje refactor --- src/main.rs | 292 +++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 172 insertions(+), 120 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6f88878..2e85ab5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,18 +52,53 @@ use chan_signal::Signal; use chan::{Sender, Receiver}; use std::os::unix::io::{RawFd, IntoRawFd}; -// TODO: split this into read-only config and mutable state + +#[derive(Clone, Debug, PartialEq)] struct EinConfig { + program: String, + program_args: Vec, + count: u64, childhood: Duration, graceperiod: Duration, - manual_ack: bool, retries: u64, - count: u64, - bind_fds: Vec, - cmd: Command, ipv4_only: bool, ipv6_only: bool, + manual_ack: bool, + ctrl_path: String, + bind_slugs: Vec, +} + +struct EinState { + cmd: Command, ctrl_req_rx: Receiver, + cfg: EinConfig, + timer: timer::Timer, + timer_tx: Sender, + timer_rx: Receiver, +} + +#[derive(Copy, Clone, Debug, PartialEq)] +enum TimerAction { + CheckAlive(u32), + CheckTerminated(u32), + CheckShutdown(u32), +} + +#[derive(Copy, Clone, Debug, PartialEq)] +enum CtrlAction { + Increment, + Decrement, + ManualAck(u32), + SigAll(Signal), + ShutdownAll, + UpgradeAll, + Status, +} + +#[derive(Clone, Debug, PartialEq)] +struct CtrlRequest { + action: CtrlAction, + tx: Sender, } #[derive(Copy, Clone, Debug, PartialEq)] @@ -84,23 +119,24 @@ struct Offspring { impl Offspring { - pub fn spawn(cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) -> Result { + pub fn spawn(state: &mut EinState) -> Result { let mut o = Offspring { state: OffspringState::Infancy, - process: cfg.cmd.spawn().expect("error spawning"), + process: state.cmd.spawn().expect("error spawning"), attempts: 0, timer_guard: None, replaces: None, }; let pid = o.process.id(); - o.timer_guard = Some(timer.schedule_with_delay(cfg.childhood, move || { + let t_tx = state.timer_tx.clone(); + o.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.childhood, move || { t_tx.send(TimerAction::CheckAlive(pid)); })); Ok(o) } - pub fn respawn(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) -> Result { - let mut successor = try!(Offspring::spawn(cfg, timer, t_tx)); + pub fn respawn(&mut self, state: &mut EinState) -> Result { + let mut successor = try!(Offspring::spawn(state)); successor.replaces = Some(self.process.id()); Ok(successor) } @@ -120,22 +156,24 @@ impl Offspring { self.state = OffspringState::Dead; } - pub fn terminate(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) { + pub fn terminate(&mut self, state: &mut EinState) { if !self.is_active() { return; } self.signal(Signal::TERM); self.state = OffspringState::Notified; let pid = self.process.id(); - self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod, move || { + let t_tx = state.timer_tx.clone(); + self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod, move || { t_tx.send(TimerAction::CheckTerminated(pid)); })); } - pub fn shutdown(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) { + pub fn shutdown(&mut self, state: &mut EinState) { if !self.is_active() { return; } self.signal(Signal::USR2); self.state = OffspringState::Notified; let pid = self.process.id(); - self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod , move || { + let t_tx = state.timer_tx.clone(); + self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod , move || { t_tx.send(TimerAction::CheckShutdown(pid)); })); } @@ -162,48 +200,21 @@ impl Offspring { } } -#[derive(Copy, Clone, Debug, PartialEq)] -enum TimerAction { - CheckAlive(u32), - CheckTerminated(u32), - CheckShutdown(u32), -} - -#[derive(Copy, Clone, Debug, PartialEq)] -enum CtrlAction { - Increment, - Decrement, - ManualAck(u32), - SigAll(Signal), - ShutdownAll, - UpgradeAll, - Status, -} - -#[derive(Clone, Debug, PartialEq)] -struct CtrlRequest { - action: CtrlAction, - tx: Sender, -} - -// This is the main event loop -fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { - - //// create timer - let mut timer = timer::Timer::new(); - let (timer_tx, timer_rx): (Sender, Receiver) = chan::async(); +/* * * * * * * * Main Event Loop * * * * * * * */ +fn shepard(mut state: EinState, signal_rx: Receiver) { //// birth the initial set of offspring let mut brood: HashMap = HashMap::new(); - for _ in 0..cfg.count { - let o = Offspring::spawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap(); + for _ in 0..state.cfg.count { + let o = Offspring::spawn(&mut state).unwrap(); let pid = o.process.id(); brood.insert(pid, o); println!("Spawned: {}", pid); } // Ugh, see: http://burntsushi.net/rustdoc/chan/macro.chan_select.html#failure-modes - let ctrl_req_rx = cfg.ctrl_req_rx.clone(); + let ctrl_req_rx = state.ctrl_req_rx.clone(); + let timer_rx = state.timer_rx.clone(); //// infinite select() loop over timers, signals let mut run = true; @@ -214,26 +225,26 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { // Need to move 'o' out of HashMap here so we can mutate // the map in other ways if let Some(mut o) = brood.remove(&pid) { - if !cfg.manual_ack && o.state == OffspringState::Infancy { + if !state.cfg.manual_ack && o.state == OffspringState::Infancy { if o.is_active() { println!("{} found to be alive", pid); o.state = OffspringState::Healthy; if let Some(old_pid) = o.replaces { if let Some(old) = brood.get_mut(&old_pid) { - old.shutdown(&mut cfg, &mut timer, timer_tx.clone()); + old.shutdown(&mut state); } } } - } else if cfg.manual_ack && o.state == OffspringState::Infancy { + } else if state.cfg.manual_ack && o.state == OffspringState::Infancy { println!("{} didn't check in", pid); - if o.attempts + 1 >= cfg.retries { + if o.attempts + 1 >= state.cfg.retries { println!("Ran out of retries..."); } else { - let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap(); + let mut successor = o.respawn(&mut state).unwrap(); successor.attempts = o.attempts + 1; brood.insert(successor.process.id(), successor); } - o.terminate(&mut cfg, &mut timer, timer_tx.clone()); + o.terminate(&mut state); } else { println!("Unexpected CheckAlive state! pid={} state={:?}", o.process.id(), o.state); } @@ -243,7 +254,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { TimerAction::CheckShutdown(pid) => { if let Some(o) = brood.get_mut(&pid) { if o.is_active() { - o.terminate(&mut cfg, &mut timer, timer_tx.clone()); + o.terminate(&mut state); } } }, @@ -258,24 +269,25 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { ctrl_req_rx.recv() -> maybe_req => if let Some(req) = maybe_req { match req.action { CtrlAction::Increment => { - let o = Offspring::spawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap(); + let o = Offspring::spawn(&mut state).unwrap(); let pid = o.process.id(); brood.insert(pid, o); - req.tx.send(format!("Spawned! Went from {} to {}", cfg.count, cfg.count+1)); - cfg.count = cfg.count+1; + req.tx.send(format!("Spawned! Went from {} to {}", state.cfg.count, state.cfg.count+1)); + state.cfg.count = state.cfg.count+1; }, CtrlAction::Decrement => { - if cfg.count <= 0 { + if state.cfg.count <= 0 { req.tx.send(format!("Already at count=0, no-op")); continue; } let mut done = false; for (_, o) in brood.iter_mut() { if o.is_active() { - o.shutdown(&mut cfg, &mut timer, timer_tx.clone()); - req.tx.send(format!("Notified! Went from {} to {}", cfg.count, cfg.count-1)); - cfg.count = cfg.count-1; + o.shutdown(&mut state); + req.tx.send(format!("Notified! Went from {} to {}", state.cfg.count, state.cfg.count-1)); + state.cfg.count = state.cfg.count-1; done = true; + break; } } if !done { @@ -292,7 +304,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { let mut pid_list = vec![]; for (pid, o) in brood.iter_mut() { if o.is_active() { - o.shutdown(&mut cfg, &mut timer, timer_tx.clone()); + o.shutdown(&mut state); pid_list.push(pid); } } @@ -306,7 +318,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { if !o.is_active() { continue; } - o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap() + o.respawn(&mut state).unwrap() }; successor.attempts = 0; brood.insert(successor.process.id(), successor); @@ -336,16 +348,16 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { println!("PID {} exited", pid); if let Some(mut o) = brood.remove(&(pid as u32)) { match o.state { OffspringState::Infancy => { - if o.attempts + 1 >= cfg.retries { + if o.attempts + 1 >= state.cfg.retries { println!("Ran out of retries..."); } else { - let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap(); + let mut successor = o.respawn(&mut state).unwrap(); successor.attempts = o.attempts + 1; brood.insert(successor.process.id(), successor); } }, OffspringState::Healthy => { - let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap(); + let mut successor = o.respawn(&mut state).unwrap(); successor.replaces = Some(pid as u32); brood.insert(successor.process.id(), successor); }, @@ -379,7 +391,7 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { if !o.is_active() { continue; } - o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap() + o.respawn(&mut state).unwrap() }; successor.attempts = 0; brood.insert(successor.process.id(), successor); @@ -392,14 +404,14 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { Signal::INT | Signal::USR2 => { println!("Exiting! Gracefully shutting down children first, but won't wait."); for (_, o) in brood.iter_mut() { - o.shutdown(&mut cfg, &mut timer, timer_tx.clone()); + o.shutdown(&mut state); } run = false; }, Signal::TERM | Signal::QUIT => { println!("Exiting! Killing children first, but won't wait."); for (_, o) in brood.iter_mut() { - o.terminate(&mut cfg, &mut timer, timer_tx.clone()); + o.terminate(&mut state); } run = false; }, @@ -418,8 +430,10 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { println!("Done."); } +/* * * * * * * * Setup and CLI * * * * * * * */ + fn print_usage(opts: Options) { - let brief = "usage:\teinhyrningsins [options] program"; + let brief = "usage:\teinhyrningsins [options] [--] program [program_args]"; println!(""); print!("{}", opts.usage(&brief)); } @@ -438,10 +452,12 @@ fn main() { opts.optflag("m", "manual", "manual (explicit) acknowledge mode"); opts.optopt("n", "number", "how many program copies to spawn", "COUNT"); opts.optmulti("b", "bind", "socket(s) to bind to", "ADDR"); + opts.optopt("d", "socket-path", "where to look for control socket (default: /tmp/einhorn.sock)", "PATH"); + opts.optopt("r", "retries", "how many times to attempt spawning", "RETRIES"); let matches = match opts.parse(&args[1..]) { Ok(m) => { m } - Err(f) => { println!("{}", f.to_string()); print_usage(opts); exit(-1); } + Err(f) => { println!("{}\n", f.to_string()); print_usage(opts); exit(-1); } }; if matches.opt_present("help") { @@ -460,27 +476,22 @@ fn main() { } //// Parse Configuration - let (ctrl_req_tx, ctrl_req_rx): (Sender, Receiver) = chan::async(); - let mut cfg = EinConfig{ - count: 1, - childhood: Duration::seconds(3), - graceperiod: Duration::seconds(3), - retries: 3, - bind_fds: vec![], - ipv4_only: matches.opt_present("4"), - ipv6_only: matches.opt_present("6"), - manual_ack: matches.opt_present("m"), - cmd: Command::new(""), - ctrl_req_rx: ctrl_req_rx, - }; - let path_str = matches.opt_str("socket-path").unwrap_or("/tmp/einhorn.sock".to_string()); - if let Some(n) = matches.opt_str("number") { - cfg.count = u64::from_str(&n).expect("number arg should be an integer"); - } + let count = match matches.opt_str("number") { + Some(n) => u64::from_str(&n).expect("number arg should be an integer"), + None => 1, + }; + + let retries = match matches.opt_str("retries") { + Some(n) => u64::from_str(&n).expect("retries arg should be an integer"), + None => 1, + }; let bind_slugs = matches.opt_strs("bind"); + let ipv4_only = matches.opt_present("4"); + let ipv6_only = matches.opt_present("6"); + let manual_ack = matches.opt_present("m"); let program_and_args = if !matches.free.is_empty() { matches.free @@ -488,25 +499,73 @@ fn main() { println!("Missing program to run (try --help)"); exit(-1); }; + let mut program_and_args = program_and_args.into_iter(); + let cfg = EinConfig { + program: program_and_args.next().unwrap(), + program_args: program_and_args.collect(), + count: count, + childhood: Duration::seconds(3), + graceperiod: Duration::seconds(3), + retries: retries, + ipv4_only: ipv4_only, + ipv6_only: ipv6_only, + manual_ack: manual_ack, + ctrl_path: path_str, + bind_slugs: bind_slugs, + }; - //// Bind Sockets - - // Control socket first + // Control socket first; not same scope as other state // XXX: handle this more gracefully (per-process) - let ctrl_path = Path::new(&path_str); + let tmp = cfg.ctrl_path.clone(); + let ctrl_path = Path::new(&tmp); if ctrl_path.exists() { fs::remove_file(&ctrl_path).unwrap(); } + println!("Binding control socket to: {:?}", ctrl_path); let ctrl_listener = UnixListener::bind(ctrl_path).unwrap(); // XXX: set mode/permissions/owner? + let (ctrl_req_tx, ctrl_req_rx): (Sender, Receiver) = chan::async(); + + //// Listen for signals (before any fork()) + println!("Registering signal handlers..."); + let signal_rx = chan_signal::notify(&[Signal::HUP, + Signal::INT, + Signal::QUIT, + Signal::TERM, + Signal::PIPE, + Signal::ALRM, + Signal::CHLD, // NB: PR has been submitted + Signal::TTIN, + Signal::TTOU, + Signal::USR1, + Signal::USR2, + Signal::STOP, + Signal::CONT]); + + let state = match init(cfg, ctrl_req_rx) { + Ok(s) => s, + Err(e) => { println!("{}", e); exit(-1); }, + }; + + //// Start Constrol Socket Thread + thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx)); + + //// State Event Loop + shepard(state, signal_rx); + exit(0); +} + +// Initializes config into state +fn init(cfg: EinConfig, ctrl_req_rx: Receiver) -> Result { + // These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK) - let sock_confs: Vec<(SocketAddr, bool, bool)> = bind_slugs.iter().map(|b| { + let sock_confs: Vec<(SocketAddr, bool, bool)> = cfg.bind_slugs.iter().map(|b| { let mut r = false; let mut n = false; let mut addr_chunks = b.split(','); - let sock_str = addr_chunks.next().unwrap(); + let sock_str = addr_chunks.next().unwrap(); // safe let mut sock_addrs = sock_str.to_socket_addrs().unwrap(); // ugly let sock = if cfg.ipv4_only { @@ -530,6 +589,7 @@ fn main() { (sock, r, n) }).collect(); + // Configure logging let mut builder = env_logger::LogBuilder::new(); builder.parse("INFO"); if env::var("RUST_LOG").is_ok() { @@ -542,10 +602,10 @@ fn main() { (TcpListener::bind(sa).unwrap(), r, n) }).collect(); - let mut cmd = Command::new(&program_and_args[0]); - cmd.args(&program_and_args[1..]); + let mut cmd = Command::new(cfg.program.clone()); + cmd.args(&cfg.program_args); - cfg.bind_fds = binds.into_iter().map(|t| { + let bind_fds: Vec = binds.into_iter().map(|t| { let b = t.0; let r = t.1; let n = t.2; // ugly let orig_fd = b.into_raw_fd(); // Duplicate, which also clears the CLOEXEC flag @@ -560,36 +620,28 @@ fn main() { fd }).collect(); - cmd.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string()); + cmd.env("EINHORN_FD_COUNT", bind_fds.len().to_string()); // This iterator destroys the TcpListeners - for (i, fd) in cfg.bind_fds.iter().enumerate() { + for (i, fd) in bind_fds.iter().enumerate() { cmd.env(format!("EINHORN_FD_{}", i), fd.to_string()); } - cfg.cmd = cmd; - //// Listen for signals (before any fork()) - println!("Registering signal handlers..."); - let signal_rx = chan_signal::notify(&[Signal::HUP, - Signal::INT, - Signal::QUIT, - Signal::TERM, - Signal::PIPE, - Signal::ALRM, - Signal::CHLD, // NB: PR has been submitted - Signal::TTIN, - Signal::TTOU, - Signal::USR1, - Signal::USR2, - Signal::STOP, - Signal::CONT]); - - //// Start Constrol Socket Thread - thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx)); + // create timer thread + let timer = timer::Timer::new(); + let (timer_tx, timer_rx): (Sender, Receiver) = chan::async(); - shepard(cfg, signal_rx); - exit(0); + Ok(EinState { + cmd: cmd, + ctrl_req_rx: ctrl_req_rx, + cfg: cfg, + timer: timer, + timer_tx: timer_tx, + timer_rx: timer_rx, + }) } +/* * * * * * * * Control Socket Server * * * * * * * */ + const CTRL_SHELL_USAGE: &'static str = r#"\"Command Listing: inc increments number of children -- cgit v1.2.3