aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs155
1 files changed, 92 insertions, 63 deletions
diff --git a/src/main.rs b/src/main.rs
index b7dee25..35968ea 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,10 +16,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#[macro_use]
+extern crate chan;
+
extern crate getopts;
extern crate log;
extern crate env_logger;
extern crate nix;
+extern crate timer;
+extern crate chan_signal;
use std::env;
use std::u64;
@@ -30,29 +35,65 @@ use std::process::Child;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::ToSocketAddrs;
+use std::time::{Duration, Instant};
use getopts::Options;
-use std::os::unix::io::IntoRawFd;
-use nix::sys::signal;
+use chan_signal::Signal;
+use chan::{Sender, Receiver};
+use std::os::unix::io::{RawFd, IntoRawFd};
+
+struct EinConfig {
+ childhood: Duration,
+ retries: u64,
+ count: u64,
+ bind_fds: Vec<RawFd>,
+ prog: Command,
+ //XXX:rpc_ask: Sender<String>,
+ //XXX:rpc_reply: Receiver<Result<String, String>>,
+}
-fn run(binds: Vec<TcpListener>, mut prog: Command, number: u64) {
+enum OffspringState {
+ Expectant, // no process exist yet
+ Infancy, // just started, waiting for ACK
+ Healthy,
+ Sick,
+ Notified, // shutting down
+ Dead,
+}
- prog.env("EINHORN_FD_COUNT", binds.len().to_string());
- // This iterator destroys the TcpListeners
- for (i, b) in binds.into_iter().enumerate() {
- let orig_fd = b.into_raw_fd();
- // Duplicate, which also clears the CLOEXEC flag
- //let fd = nix::fcntl::fcntl(nix::fcntl::FcntlArg::F_DUPFD(orig_fd)).unwrap();
- let fd = nix::unistd::dup(orig_fd).unwrap();
- println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap());
- prog.env(format!("EINHORN_FD_{}", i), fd.to_string());
- // NB: is fd getting destroyed here?
+struct Offspring {
+ state: OffspringState,
+ process: Option<Child>,
+ birthday: Instant, // specifies the generation
+ attempts: u64,
+}
+
+// This is the main event loop
+fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
+
+ // birth the initial set of offspring
+
+ // create timer
+ let timer = timer::Timer::new();
+ // XXX: these signatures are bogus
+ let (timer_tx, timer_rx): (Sender<u64>, Receiver<u64>) = chan::async();
+
+ // infinite select() loop over timers, signals, rpc
+
+ loop {
+ chan_select! {
+ timer_rx.recv() => println!("Timer tick'd"),
+ signal_rx.recv() => {
+ println!("Signal received!");
+ break;
+ }
+ }
}
let mut children: Vec<Child> = vec![];
- for _ in 0..number {
+ for _ in 0..cfg.count {
println!("Running!");
- children.push(prog.spawn().expect("error spawning"));
+ children.push(cfg.prog.spawn().expect("error spawning"));
}
println!("Waiting for all children to die");
@@ -62,25 +103,6 @@ fn run(binds: Vec<TcpListener>, mut prog: Command, number: u64) {
println!("Done.");
}
-static mut interrupted: bool = false;
-
-extern fn handle_hup(_: i32) {
- println!("This is where I would restart all children gracefully?");
-}
-
-extern fn handle_int(_: i32) {
- let first = unsafe {
- let tmp = !interrupted;
- interrupted = true;
- tmp
- };
- if first {
- println!("Waiting for childred to shutdown gracefully (Ctrl-C again to bail)");
- } else {
- panic!();
- }
-}
-
fn print_usage(opts: Options) {
let brief = "usage:\teinhyrningsins [options] program";
println!("");
@@ -108,14 +130,22 @@ fn main() {
return;
}
- let number: u64 = match matches.opt_str("number") {
+ //// Parse Configuration
+ let mut cfg = EinConfig{
+ count: 1,
+ childhood: Duration::new(3, 0),
+ retries: 3,
+ bind_fds: vec![],
+ prog: Command::new(""),
+ };
+
+ cfg.count = match matches.opt_str("number") {
Some(n) => u64::from_str(&n).expect("number arg should be an integer"),
- None => 1
+ None => 1 // XXX: duplicate default
};
+ //// Bind Sockets
let sock_addrs: Vec<SocketAddr> = matches.opt_strs("bind").iter().map(|b| {
- //let sa: SocketAddr = b.to_socket_addrs().unwrap().next().unwrap();
- //sa
b.to_socket_addrs().unwrap().next().unwrap()
}).collect();
@@ -133,30 +163,6 @@ fn main() {
}
builder.init().unwrap();
- println!("Registering signal handlers...");
- let mut mask_hup = signal::SigSet::empty();
- mask_hup.add(signal::Signal::SIGHUP);
- mask_hup.add(signal::Signal::SIGUSR2);
- let hup_action = signal::SigAction::new(
- signal::SigHandler::Handler(handle_hup),
- signal::SaFlags::empty(),
- mask_hup);
-
- let mut mask_int = signal::SigSet::empty();
- mask_int.add(signal::Signal::SIGINT);
- mask_int.add(signal::Signal::SIGTERM);
- let int_action = signal::SigAction::new(
- signal::SigHandler::Handler(handle_int),
- signal::SaFlags::empty(),
- mask_int);
-
- unsafe {
- signal::sigaction(signal::Signal::SIGHUP, &hup_action).unwrap();
- signal::sigaction(signal::Signal::SIGUSR2, &hup_action).unwrap();
- signal::sigaction(signal::Signal::SIGINT, &int_action).unwrap();
- signal::sigaction(signal::Signal::SIGTERM, &int_action).unwrap();
- }
-
let binds: Vec<TcpListener> = sock_addrs.iter().map(|sa| {
TcpListener::bind(sa).unwrap()
}).collect();
@@ -164,6 +170,29 @@ fn main() {
let mut prog = Command::new(&program_and_args[0]);
prog.args(&program_and_args[1..]);
- run(binds, prog, number);
+ cfg.bind_fds = binds.into_iter().map(|b| {
+ let orig_fd = b.into_raw_fd();
+ // Duplicate, which also clears the CLOEXEC flag
+ let fd = nix::unistd::dup(orig_fd).unwrap();
+ println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap());
+ fd
+ }).collect();
+
+ prog.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string());
+ // This iterator destroys the TcpListeners
+ for (i, fd) in cfg.bind_fds.iter().enumerate() {
+ prog.env(format!("EINHORN_FD_{}", i), fd.to_string());
+ }
+ cfg.prog = prog;
+
+ //// Listen for signals (before any fork())
+ println!("Registering signal handlers...");
+ let signal_rx = chan_signal::notify(&[Signal::INT,
+ Signal::TERM,
+ //Signal::CHLD, // XXX: PR has been submitted
+ Signal::USR2,
+ Signal::HUP]);
+
+ shepard(cfg, signal_rx);
exit(0);
}