From 72bea95d8ba781cf1c5641987179b5c45a28f919 Mon Sep 17 00:00:00 2001 From: bnewbold Date: Tue, 11 Oct 2016 01:52:37 -0700 Subject: refactor of Offspring+brood code --- src/main.rs | 169 +++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 122 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 8393760..9754ab7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,6 @@ extern crate nix; extern crate timer; extern crate time; extern crate chan_signal; -extern crate uuid; use std::env; use std::u64; @@ -37,11 +36,10 @@ use std::process::Child; use std::net::SocketAddr; use std::net::TcpListener; use std::net::ToSocketAddrs; -use std::time::Instant; +//use std::time::Instant; use time::Duration; use std::collections::HashMap; use getopts::Options; -use uuid::Uuid; use chan_signal::Signal; use chan::{Sender, Receiver}; @@ -49,6 +47,8 @@ use std::os::unix::io::{RawFd, IntoRawFd}; struct EinConfig { childhood: Duration, + graceperiod: Duration, + manual_ack: bool, retries: u64, count: u64, bind_fds: Vec, @@ -59,7 +59,6 @@ struct EinConfig { #[derive(Copy, Clone, Debug, PartialEq)] enum OffspringState { - Expectant, // no process exist yet Infancy, // just started, waiting for ACK Healthy, Notified, // shutting down gracefully @@ -67,38 +66,40 @@ enum OffspringState { } struct Offspring { - id: Uuid, state: OffspringState, - process: Option, - birthday: Instant, // specifies the generation + process: Child, + // birthday: Instant, // specifies the generation attempts: u64, + timer_guard: Option, + replaces: Option, } impl Offspring { - pub fn new() -> Offspring { - Offspring { - id: Uuid::new_v4(), - state: OffspringState::Expectant, - process: None, - birthday: Instant::now(), + pub fn spawn(cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) -> Result { + let mut o = Offspring { + state: OffspringState::Infancy, + process: cfg.cmd.spawn().expect("error spawning"), + // birthday: Instant::now(), attempts: 0, - } + timer_guard: None, + replaces: None, + }; + let pid = o.process.id(); + o.timer_guard = Some(timer.schedule_with_delay(cfg.childhood, move || { + t_tx.send(TimerAction::CheckAlive(pid)); + })); + Ok(o) } - pub fn spawn(&mut self, cfg: &mut EinConfig) -> Result<(), String> { - if self.is_active() { - return Err(format!("Can't spawn from state: {:?}", self.state)); - } - self.process = Some(cfg.cmd.spawn().expect("error spawning")); - self.birthday = Instant::now(); - self.attempts = 0; - Ok(()) + pub fn respawn(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) -> Result { + let mut successor = try!(Offspring::spawn(cfg, timer, t_tx)); + successor.replaces = Some(self.process.id()); + Ok(successor) } pub fn is_active(&self) -> bool { match self.state { - OffspringState::Expectant => false, OffspringState::Infancy => true, OffspringState::Healthy => true, OffspringState::Notified => true, @@ -106,53 +107,127 @@ impl Offspring { } } + pub fn kill(&mut self) { + if !self.is_active() { return; } + self.signal(Signal::KILL); + self.state = OffspringState::Dead; + } + + pub fn terminate(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) { + if !self.is_active() { return; } + self.signal(Signal::TERM); + self.state = OffspringState::Notified; + let pid = self.process.id(); + self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod, move || { + t_tx.send(TimerAction::CheckTerminated(pid)); + })); + } + + pub fn shutdown(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender) { + if !self.is_active() { return; } + self.signal(Signal::USR2); + self.state = OffspringState::Notified; + let pid = self.process.id(); + self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod , move || { + t_tx.send(TimerAction::CheckShutdown(pid)); + })); + } + pub fn signal(&mut self, sig: Signal) { - if !self.is_active() { + if self.state == OffspringState::Dead { return; } let nix_sig = match sig { Signal::HUP => nix::sys::signal::Signal::SIGHUP, Signal::INT => nix::sys::signal::Signal::SIGINT, + Signal::TERM => nix::sys::signal::Signal::SIGTERM, + Signal::KILL => nix::sys::signal::Signal::SIGKILL, _ => { println!("Unexpected signal: {:?}", sig); return; }, }; - match self.process { - Some(ref p) => { nix::sys::signal::kill(p.id() as i32, nix_sig).unwrap(); }, - None => (), - } + nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap(); } } #[derive(Copy, Clone, Debug, PartialEq)] enum TimerAction { - CheckAlive(Uuid), + CheckAlive(u32), + CheckTerminated(u32), + CheckShutdown(u32), } +/* + Result + nix::sys::wait::waitpid +*/ + // This is the main event loop fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { //// create timer - let timer = timer::Timer::new(); + let mut timer = timer::Timer::new(); let (timer_tx, timer_rx): (Sender, Receiver) = chan::async(); //// birth the initial set of offspring - let mut brood: HashMap = HashMap::new(); + let mut brood: HashMap = HashMap::new(); for _ in 0..cfg.count { - println!("Running!"); - let mut o = Offspring::new(); - o.spawn(&mut cfg).unwrap(); - let t_tx = timer_tx.clone(); - let o_id = o.id.clone(); - timer.schedule_with_delay(cfg.childhood, move || { - t_tx.send(TimerAction::CheckAlive(o_id)); - }); - brood.insert(o.id, o); + let o = Offspring::spawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap(); + let pid = o.process.id(); + brood.insert(pid, o); + println!("Spawned: {}", pid); } //// infinite select() loop over timers, signals loop { chan_select! { - timer_rx.recv() => { println!("Timer tick'd"); }, + timer_rx.recv() -> action => match action.expect("Error with timer thread") { + TimerAction::CheckAlive(pid) => { + // Need to move 'o' out of HashMap here so we can mutate + // the map in other ways + if let Some(mut o) = brood.remove(&pid) { + if !cfg.manual_ack && o.state == OffspringState::Infancy { + if o.is_active() { + println!("{} found to be alive", pid); + o.state = OffspringState::Healthy; + if let Some(old_pid) = o.replaces { + if let Some(old) = brood.get_mut(&old_pid) { + old.shutdown(&mut cfg, &mut timer, timer_tx.clone()); + } + } + } + } else if cfg.manual_ack && o.state == OffspringState::Infancy { + println!("{} didn't check in", pid); + if o.attempts + 1 >= cfg.retries { + println!("Ran out of retries..."); + } else { + let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap(); + successor.attempts = o.attempts + 1; + brood.insert(successor.process.id(), successor); + } + o.terminate(&mut cfg, &mut timer, timer_tx.clone()); + } else { + println!("Unexpected CheckAlive state! pid={} state={:?}", o.process.id(), o.state); + } + brood.insert(pid, o); + }; + } + TimerAction::CheckShutdown(pid) => { + if let Some(o) = brood.get_mut(&pid) { + if o.is_active() { + o.terminate(&mut cfg, &mut timer, timer_tx.clone()); + } + } + }, + TimerAction::CheckTerminated(pid) => { + if let Some(o) = brood.get_mut(&pid) { + if o.is_active() { + o.kill(); + } + } + }, + }, signal_rx.recv() -> sig => match sig.expect("Error with signal handler") { + Signal::USR1 => { // USING AS PLACEHOLDER FOR SIGCHLD + }, Signal::HUP => { for (_, o) in brood.iter_mut() { o.signal(sig.unwrap()); @@ -160,21 +235,18 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { Signal::INT | Signal::TERM => { println!("Notifying children..."); for (_, o) in brood.iter_mut() { - o.signal(sig.unwrap()); + o.terminate(&mut cfg, &mut timer, timer_tx.clone()); } break; }, _ => () - } + }, } } println!("Waiting for all children to die"); for (_, o) in brood.iter_mut() { - match o.process { - Some(ref mut p) => { p.wait().unwrap(); () }, - None => (), - } + o.process.wait().unwrap(); } println!("Done."); } @@ -195,6 +267,7 @@ fn main() { opts.optflag("v", "verbose", "more debugging messages"); opts.optflag("4", "ipv4-only", "only accept IPv4 connections"); opts.optflag("6", "ipv6-only", "only accept IPv6 connections"); + opts.optflag("m", "manual", "manual (explicit) acknowledge mode"); opts.optopt("n", "number", "how many program copies to spawn", "COUNT"); opts.optmulti("b", "bind", "socket(s) to bind to", "ADDR"); @@ -217,10 +290,12 @@ fn main() { let mut cfg = EinConfig{ count: 1, childhood: Duration::seconds(3), + graceperiod: Duration::seconds(3), retries: 3, bind_fds: vec![], ipv4_only: matches.opt_present("4"), ipv6_only: matches.opt_present("6"), + manual_ack: matches.opt_present("m"), cmd: Command::new(""), }; -- cgit v1.2.3