aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs103
1 files changed, 75 insertions, 28 deletions
diff --git a/src/main.rs b/src/main.rs
index 35968ea..c1e06b7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -24,7 +24,9 @@ extern crate log;
extern crate env_logger;
extern crate nix;
extern crate timer;
+extern crate time;
extern crate chan_signal;
+extern crate uuid;
use std::env;
use std::u64;
@@ -35,8 +37,11 @@ use std::process::Child;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::ToSocketAddrs;
-use std::time::{Duration, Instant};
+use std::time::Instant;
+use time::Duration;
+use std::collections::HashMap;
use getopts::Options;
+use uuid::Uuid;
use chan_signal::Signal;
use chan::{Sender, Receiver};
@@ -47,59 +52,99 @@ struct EinConfig {
retries: u64,
count: u64,
bind_fds: Vec<RawFd>,
- prog: Command,
- //XXX:rpc_ask: Sender<String>,
- //XXX:rpc_reply: Receiver<Result<String, String>>,
+ cmd: Command,
+ //TODO:rpc_ask: Sender<String>,
+ //TODO:rpc_reply: Receiver<Result<String, String>>,
}
+#[derive(Copy, Clone, Debug, PartialEq)]
enum OffspringState {
Expectant, // no process exist yet
Infancy, // just started, waiting for ACK
Healthy,
- Sick,
- Notified, // shutting down
+ Notified, // shutting down gracefully
Dead,
}
struct Offspring {
+ id: Uuid,
state: OffspringState,
process: Option<Child>,
birthday: Instant, // specifies the generation
attempts: u64,
}
+impl Offspring {
+
+ pub fn new() -> Offspring {
+ Offspring {
+ id: Uuid::new_v4(),
+ state: OffspringState::Expectant,
+ process: None,
+ birthday: Instant::now(),
+ attempts: 0,
+ }
+ }
+
+ pub fn spawn(&mut self, cfg: &mut EinConfig) -> Result<(), String> {
+ if self.state != OffspringState::Expectant && self.state != OffspringState::Dead {
+ return Err(format!("Can't spawn from state: {:?}", self.state));
+ }
+ self.process = Some(cfg.cmd.spawn().expect("error spawning"));
+ self.birthday = Instant::now();
+ self.attempts = 0;
+ Ok(())
+ }
+}
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+enum TimerAction {
+ CheckAlive(Uuid),
+}
+
// This is the main event loop
fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
- // birth the initial set of offspring
-
- // create timer
+ //// create timer
let timer = timer::Timer::new();
// XXX: these signatures are bogus
- let (timer_tx, timer_rx): (Sender<u64>, Receiver<u64>) = chan::async();
+ let (timer_tx, timer_rx): (Sender<TimerAction>, Receiver<TimerAction>) = chan::async();
- // infinite select() loop over timers, signals, rpc
+ //// birth the initial set of offspring
+ let mut brood: HashMap<Uuid, Offspring> = HashMap::new();
+ for _ in 0..cfg.count {
+ println!("Running!");
+ let mut o = Offspring::new();
+ o.spawn(&mut cfg).unwrap();
+ let t_tx = timer_tx.clone();
+ let o_id = o.id.clone();
+ timer.schedule_with_delay(cfg.childhood, move || {
+ t_tx.send(TimerAction::CheckAlive(o_id));
+ });
+ brood.insert(o.id, o);
+ }
+ //// infinite select() loop over timers, signals, rpc
loop {
chan_select! {
timer_rx.recv() => println!("Timer tick'd"),
- signal_rx.recv() => {
- println!("Signal received!");
+ signal_rx.recv() -> sig => {
+ println!("");
+ println!("Signal received! {:?}", sig);
break;
}
}
}
- let mut children: Vec<Child> = vec![];
- for _ in 0..cfg.count {
- println!("Running!");
- children.push(cfg.prog.spawn().expect("error spawning"));
- }
-
+/* XXX:
println!("Waiting for all children to die");
- for mut c in children {
- c.wait().unwrap();
+ for mut o in brood.values() {
+ match o.process {
+ Some(ref mut p) => { p.wait().unwrap(); () },
+ None => (),
+ }
}
+*/
println!("Done.");
}
@@ -133,10 +178,10 @@ fn main() {
//// Parse Configuration
let mut cfg = EinConfig{
count: 1,
- childhood: Duration::new(3, 0),
+ childhood: Duration::seconds(3),
retries: 3,
bind_fds: vec![],
- prog: Command::new(""),
+ cmd: Command::new(""),
};
cfg.count = match matches.opt_str("number") {
@@ -167,8 +212,10 @@ fn main() {
TcpListener::bind(sa).unwrap()
}).collect();
- let mut prog = Command::new(&program_and_args[0]);
- prog.args(&program_and_args[1..]);
+ let mut cmd = Command::new(&program_and_args[0]);
+ cmd.args(&program_and_args[1..]);
+
+ // TODO: check that program exists and is executable
cfg.bind_fds = binds.into_iter().map(|b| {
let orig_fd = b.into_raw_fd();
@@ -178,12 +225,12 @@ fn main() {
fd
}).collect();
- prog.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string());
+ cmd.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string());
// This iterator destroys the TcpListeners
for (i, fd) in cfg.bind_fds.iter().enumerate() {
- prog.env(format!("EINHORN_FD_{}", i), fd.to_string());
+ cmd.env(format!("EINHORN_FD_{}", i), fd.to_string());
}
- cfg.prog = prog;
+ cfg.cmd = cmd;
//// Listen for signals (before any fork())
println!("Registering signal handlers...");