aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbnewbold <bnewbold@robocracy.org>2016-10-11 01:52:37 -0700
committerbnewbold <bnewbold@robocracy.org>2016-10-11 01:55:53 -0700
commit72bea95d8ba781cf1c5641987179b5c45a28f919 (patch)
tree77b121d70de4b3e7ce02ddbaf3de9ec50397faef
parentec8e71a1714ab6cb56f1a22ff1c51b6dd051c7f0 (diff)
downloadeinhyrningsins-72bea95d8ba781cf1c5641987179b5c45a28f919.tar.gz
einhyrningsins-72bea95d8ba781cf1c5641987179b5c45a28f919.zip
refactor of Offspring+brood code
-rw-r--r--src/main.rs169
1 files changed, 122 insertions, 47 deletions
diff --git a/src/main.rs b/src/main.rs
index 8393760..9754ab7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -26,7 +26,6 @@ extern crate nix;
extern crate timer;
extern crate time;
extern crate chan_signal;
-extern crate uuid;
use std::env;
use std::u64;
@@ -37,11 +36,10 @@ use std::process::Child;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::ToSocketAddrs;
-use std::time::Instant;
+//use std::time::Instant;
use time::Duration;
use std::collections::HashMap;
use getopts::Options;
-use uuid::Uuid;
use chan_signal::Signal;
use chan::{Sender, Receiver};
@@ -49,6 +47,8 @@ use std::os::unix::io::{RawFd, IntoRawFd};
struct EinConfig {
childhood: Duration,
+ graceperiod: Duration,
+ manual_ack: bool,
retries: u64,
count: u64,
bind_fds: Vec<RawFd>,
@@ -59,7 +59,6 @@ struct EinConfig {
#[derive(Copy, Clone, Debug, PartialEq)]
enum OffspringState {
- Expectant, // no process exist yet
Infancy, // just started, waiting for ACK
Healthy,
Notified, // shutting down gracefully
@@ -67,38 +66,40 @@ enum OffspringState {
}
struct Offspring {
- id: Uuid,
state: OffspringState,
- process: Option<Child>,
- birthday: Instant, // specifies the generation
+ process: Child,
+ // birthday: Instant, // specifies the generation
attempts: u64,
+ timer_guard: Option<timer::Guard>,
+ replaces: Option<u32>,
}
impl Offspring {
- pub fn new() -> Offspring {
- Offspring {
- id: Uuid::new_v4(),
- state: OffspringState::Expectant,
- process: None,
- birthday: Instant::now(),
+ pub fn spawn(cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) -> Result<Offspring, String> {
+ let mut o = Offspring {
+ state: OffspringState::Infancy,
+ process: cfg.cmd.spawn().expect("error spawning"),
+ // birthday: Instant::now(),
attempts: 0,
- }
+ timer_guard: None,
+ replaces: None,
+ };
+ let pid = o.process.id();
+ o.timer_guard = Some(timer.schedule_with_delay(cfg.childhood, move || {
+ t_tx.send(TimerAction::CheckAlive(pid));
+ }));
+ Ok(o)
}
- pub fn spawn(&mut self, cfg: &mut EinConfig) -> Result<(), String> {
- if self.is_active() {
- return Err(format!("Can't spawn from state: {:?}", self.state));
- }
- self.process = Some(cfg.cmd.spawn().expect("error spawning"));
- self.birthday = Instant::now();
- self.attempts = 0;
- Ok(())
+ pub fn respawn(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) -> Result<Offspring, String> {
+ let mut successor = try!(Offspring::spawn(cfg, timer, t_tx));
+ successor.replaces = Some(self.process.id());
+ Ok(successor)
}
pub fn is_active(&self) -> bool {
match self.state {
- OffspringState::Expectant => false,
OffspringState::Infancy => true,
OffspringState::Healthy => true,
OffspringState::Notified => true,
@@ -106,53 +107,127 @@ impl Offspring {
}
}
+ pub fn kill(&mut self) {
+ if !self.is_active() { return; }
+ self.signal(Signal::KILL);
+ self.state = OffspringState::Dead;
+ }
+
+ pub fn terminate(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) {
+ if !self.is_active() { return; }
+ self.signal(Signal::TERM);
+ self.state = OffspringState::Notified;
+ let pid = self.process.id();
+ self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod, move || {
+ t_tx.send(TimerAction::CheckTerminated(pid));
+ }));
+ }
+
+ pub fn shutdown(&mut self, cfg: &mut EinConfig, timer: &mut timer::Timer, t_tx: Sender<TimerAction>) {
+ if !self.is_active() { return; }
+ self.signal(Signal::USR2);
+ self.state = OffspringState::Notified;
+ let pid = self.process.id();
+ self.timer_guard = Some(timer.schedule_with_delay(cfg.graceperiod , move || {
+ t_tx.send(TimerAction::CheckShutdown(pid));
+ }));
+ }
+
pub fn signal(&mut self, sig: Signal) {
- if !self.is_active() {
+ if self.state == OffspringState::Dead {
return;
}
let nix_sig = match sig {
Signal::HUP => nix::sys::signal::Signal::SIGHUP,
Signal::INT => nix::sys::signal::Signal::SIGINT,
+ Signal::TERM => nix::sys::signal::Signal::SIGTERM,
+ Signal::KILL => nix::sys::signal::Signal::SIGKILL,
_ => { println!("Unexpected signal: {:?}", sig); return; },
};
- match self.process {
- Some(ref p) => { nix::sys::signal::kill(p.id() as i32, nix_sig).unwrap(); },
- None => (),
- }
+ nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap();
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum TimerAction {
- CheckAlive(Uuid),
+ CheckAlive(u32),
+ CheckTerminated(u32),
+ CheckShutdown(u32),
}
+/*
+ Result<WaitStatus>
+ nix::sys::wait::waitpid
+*/
+
// This is the main event loop
fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
//// create timer
- let timer = timer::Timer::new();
+ let mut timer = timer::Timer::new();
let (timer_tx, timer_rx): (Sender<TimerAction>, Receiver<TimerAction>) = chan::async();
//// birth the initial set of offspring
- let mut brood: HashMap<Uuid, Offspring> = HashMap::new();
+ let mut brood: HashMap<u32, Offspring> = HashMap::new();
for _ in 0..cfg.count {
- println!("Running!");
- let mut o = Offspring::new();
- o.spawn(&mut cfg).unwrap();
- let t_tx = timer_tx.clone();
- let o_id = o.id.clone();
- timer.schedule_with_delay(cfg.childhood, move || {
- t_tx.send(TimerAction::CheckAlive(o_id));
- });
- brood.insert(o.id, o);
+ let o = Offspring::spawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap();
+ let pid = o.process.id();
+ brood.insert(pid, o);
+ println!("Spawned: {}", pid);
}
//// infinite select() loop over timers, signals
loop {
chan_select! {
- timer_rx.recv() => { println!("Timer tick'd"); },
+ timer_rx.recv() -> action => match action.expect("Error with timer thread") {
+ TimerAction::CheckAlive(pid) => {
+ // Need to move 'o' out of HashMap here so we can mutate
+ // the map in other ways
+ if let Some(mut o) = brood.remove(&pid) {
+ if !cfg.manual_ack && o.state == OffspringState::Infancy {
+ if o.is_active() {
+ println!("{} found to be alive", pid);
+ o.state = OffspringState::Healthy;
+ if let Some(old_pid) = o.replaces {
+ if let Some(old) = brood.get_mut(&old_pid) {
+ old.shutdown(&mut cfg, &mut timer, timer_tx.clone());
+ }
+ }
+ }
+ } else if cfg.manual_ack && o.state == OffspringState::Infancy {
+ println!("{} didn't check in", pid);
+ if o.attempts + 1 >= cfg.retries {
+ println!("Ran out of retries...");
+ } else {
+ let mut successor = o.respawn(&mut cfg, &mut timer, timer_tx.clone()).unwrap();
+ successor.attempts = o.attempts + 1;
+ brood.insert(successor.process.id(), successor);
+ }
+ o.terminate(&mut cfg, &mut timer, timer_tx.clone());
+ } else {
+ println!("Unexpected CheckAlive state! pid={} state={:?}", o.process.id(), o.state);
+ }
+ brood.insert(pid, o);
+ };
+ }
+ TimerAction::CheckShutdown(pid) => {
+ if let Some(o) = brood.get_mut(&pid) {
+ if o.is_active() {
+ o.terminate(&mut cfg, &mut timer, timer_tx.clone());
+ }
+ }
+ },
+ TimerAction::CheckTerminated(pid) => {
+ if let Some(o) = brood.get_mut(&pid) {
+ if o.is_active() {
+ o.kill();
+ }
+ }
+ },
+ },
signal_rx.recv() -> sig => match sig.expect("Error with signal handler") {
+ Signal::USR1 => { // USING AS PLACEHOLDER FOR SIGCHLD
+ },
Signal::HUP => {
for (_, o) in brood.iter_mut() {
o.signal(sig.unwrap());
@@ -160,21 +235,18 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
Signal::INT | Signal::TERM => {
println!("Notifying children...");
for (_, o) in brood.iter_mut() {
- o.signal(sig.unwrap());
+ o.terminate(&mut cfg, &mut timer, timer_tx.clone());
}
break;
},
_ => ()
- }
+ },
}
}
println!("Waiting for all children to die");
for (_, o) in brood.iter_mut() {
- match o.process {
- Some(ref mut p) => { p.wait().unwrap(); () },
- None => (),
- }
+ o.process.wait().unwrap();
}
println!("Done.");
}
@@ -195,6 +267,7 @@ fn main() {
opts.optflag("v", "verbose", "more debugging messages");
opts.optflag("4", "ipv4-only", "only accept IPv4 connections");
opts.optflag("6", "ipv6-only", "only accept IPv6 connections");
+ opts.optflag("m", "manual", "manual (explicit) acknowledge mode");
opts.optopt("n", "number", "how many program copies to spawn", "COUNT");
opts.optmulti("b", "bind", "socket(s) to bind to", "ADDR");
@@ -217,10 +290,12 @@ fn main() {
let mut cfg = EinConfig{
count: 1,
childhood: Duration::seconds(3),
+ graceperiod: Duration::seconds(3),
retries: 3,
bind_fds: vec![],
ipv4_only: matches.opt_present("4"),
ipv6_only: matches.opt_present("6"),
+ manual_ack: matches.opt_present("m"),
cmd: Command::new(""),
};