From 5a84c5287040d57b31b6b6d63f9dc938915e80a2 Mon Sep 17 00:00:00 2001 From: bnewbold Date: Sun, 9 Oct 2016 15:37:15 -0700 Subject: more refactoring progress --- Cargo.lock | 10 ++++++ Cargo.toml | 2 ++ src/main.rs | 103 +++++++++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 87 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d04004f..a71cde7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,7 +9,9 @@ dependencies = [ "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "timer 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -240,6 +242,14 @@ name = "utf8-ranges" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "uuid" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "void" version = "1.0.2" diff --git a/Cargo.toml b/Cargo.toml index 4fcdd2c..923ba92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,8 @@ log = "0.3" env_logger = "0.3" getopts = "^0.2" timer = "0.1" +time = "0.1" chrono = "0.2" chan = "0.1" chan-signal = "0.1" +uuid = { version = "0.2", features = ["v4"] } diff --git a/src/main.rs b/src/main.rs index 35968ea..c1e06b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,9 @@ extern crate log; extern crate env_logger; extern crate nix; extern crate timer; +extern crate time; extern crate chan_signal; +extern crate uuid; use std::env; use std::u64; @@ -35,8 +37,11 @@ use std::process::Child; use std::net::SocketAddr; use std::net::TcpListener; use std::net::ToSocketAddrs; -use std::time::{Duration, Instant}; +use std::time::Instant; +use time::Duration; +use std::collections::HashMap; use getopts::Options; +use uuid::Uuid; use chan_signal::Signal; use chan::{Sender, Receiver}; @@ -47,59 +52,99 @@ struct EinConfig { retries: u64, count: u64, bind_fds: Vec, - prog: Command, - //XXX:rpc_ask: Sender, - //XXX:rpc_reply: Receiver>, + cmd: Command, + //TODO:rpc_ask: Sender, + //TODO:rpc_reply: Receiver>, } +#[derive(Copy, Clone, Debug, PartialEq)] enum OffspringState { Expectant, // no process exist yet Infancy, // just started, waiting for ACK Healthy, - Sick, - Notified, // shutting down + Notified, // shutting down gracefully Dead, } struct Offspring { + id: Uuid, state: OffspringState, process: Option, birthday: Instant, // specifies the generation attempts: u64, } +impl Offspring { + + pub fn new() -> Offspring { + Offspring { + id: Uuid::new_v4(), + state: OffspringState::Expectant, + process: None, + birthday: Instant::now(), + attempts: 0, + } + } + + pub fn spawn(&mut self, cfg: &mut EinConfig) -> Result<(), String> { + if self.state != OffspringState::Expectant && self.state != OffspringState::Dead { + return Err(format!("Can't spawn from state: {:?}", self.state)); + } + self.process = Some(cfg.cmd.spawn().expect("error spawning")); + self.birthday = Instant::now(); + self.attempts = 0; + Ok(()) + } +} + +#[derive(Copy, Clone, Debug, PartialEq)] +enum TimerAction { + CheckAlive(Uuid), +} + // This is the main event loop fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { - // birth the initial set of offspring - - // create timer + //// create timer let timer = timer::Timer::new(); // XXX: these signatures are bogus - let (timer_tx, timer_rx): (Sender, Receiver) = chan::async(); + let (timer_tx, timer_rx): (Sender, Receiver) = chan::async(); - // infinite select() loop over timers, signals, rpc + //// birth the initial set of offspring + let mut brood: HashMap = HashMap::new(); + for _ in 0..cfg.count { + println!("Running!"); + let mut o = Offspring::new(); + o.spawn(&mut cfg).unwrap(); + let t_tx = timer_tx.clone(); + let o_id = o.id.clone(); + timer.schedule_with_delay(cfg.childhood, move || { + t_tx.send(TimerAction::CheckAlive(o_id)); + }); + brood.insert(o.id, o); + } + //// infinite select() loop over timers, signals, rpc loop { chan_select! { timer_rx.recv() => println!("Timer tick'd"), - signal_rx.recv() => { - println!("Signal received!"); + signal_rx.recv() -> sig => { + println!(""); + println!("Signal received! {:?}", sig); break; } } } - let mut children: Vec = vec![]; - for _ in 0..cfg.count { - println!("Running!"); - children.push(cfg.prog.spawn().expect("error spawning")); - } - +/* XXX: println!("Waiting for all children to die"); - for mut c in children { - c.wait().unwrap(); + for mut o in brood.values() { + match o.process { + Some(ref mut p) => { p.wait().unwrap(); () }, + None => (), + } } +*/ println!("Done."); } @@ -133,10 +178,10 @@ fn main() { //// Parse Configuration let mut cfg = EinConfig{ count: 1, - childhood: Duration::new(3, 0), + childhood: Duration::seconds(3), retries: 3, bind_fds: vec![], - prog: Command::new(""), + cmd: Command::new(""), }; cfg.count = match matches.opt_str("number") { @@ -167,8 +212,10 @@ fn main() { TcpListener::bind(sa).unwrap() }).collect(); - let mut prog = Command::new(&program_and_args[0]); - prog.args(&program_and_args[1..]); + let mut cmd = Command::new(&program_and_args[0]); + cmd.args(&program_and_args[1..]); + + // TODO: check that program exists and is executable cfg.bind_fds = binds.into_iter().map(|b| { let orig_fd = b.into_raw_fd(); @@ -178,12 +225,12 @@ fn main() { fd }).collect(); - prog.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string()); + cmd.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string()); // This iterator destroys the TcpListeners for (i, fd) in cfg.bind_fds.iter().enumerate() { - prog.env(format!("EINHORN_FD_{}", i), fd.to_string()); + cmd.env(format!("EINHORN_FD_{}", i), fd.to_string()); } - cfg.prog = prog; + cfg.cmd = cmd; //// Listen for signals (before any fork()) println!("Registering signal handlers..."); -- cgit v1.2.3