From 44bebada4207fd7fbe133eea834ed255de3f14ae Mon Sep 17 00:00:00 2001 From: bnewbold Date: Sun, 9 Oct 2016 00:46:59 -0700 Subject: start refactoring to chan-signals --- Cargo.lock | 108 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 4 ++ src/main.rs | 155 ++++++++++++++++++++++++++++++++++++------------------------ 3 files changed, 204 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a0e669..d04004f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,10 +2,14 @@ name = "einhyrningsins" version = "0.1.0" dependencies = [ + "chan 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "chan-signal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "timer 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -16,6 +20,19 @@ dependencies = [ "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "bit-set" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "bit-vec" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "bitflags" version = "0.4.0" @@ -26,6 +43,34 @@ name = "cfg-if" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "chan" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "chan-signal" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bit-set 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "chan 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "chrono" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "env_logger" version = "0.3.5" @@ -49,6 +94,11 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "lazy_static" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "libc" version = "0.2.16" @@ -80,6 +130,46 @@ dependencies = [ "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "num" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)", + "num-iter 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num-integer" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num-iter" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num-traits" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "rand" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "regex" version = "0.1.77" @@ -127,6 +217,24 @@ dependencies = [ "thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "time" +version = "0.1.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "timer" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "utf8-ranges" version = "0.1.3" diff --git a/Cargo.toml b/Cargo.toml index be7fd7f..4fcdd2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,7 @@ nix = "0.7" log = "0.3" env_logger = "0.3" getopts = "^0.2" +timer = "0.1" +chrono = "0.2" +chan = "0.1" +chan-signal = "0.1" diff --git a/src/main.rs b/src/main.rs index b7dee25..35968ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,10 +16,15 @@ * along with this program. If not, see . */ +#[macro_use] +extern crate chan; + extern crate getopts; extern crate log; extern crate env_logger; extern crate nix; +extern crate timer; +extern crate chan_signal; use std::env; use std::u64; @@ -30,29 +35,65 @@ use std::process::Child; use std::net::SocketAddr; use std::net::TcpListener; use std::net::ToSocketAddrs; +use std::time::{Duration, Instant}; use getopts::Options; -use std::os::unix::io::IntoRawFd; -use nix::sys::signal; +use chan_signal::Signal; +use chan::{Sender, Receiver}; +use std::os::unix::io::{RawFd, IntoRawFd}; + +struct EinConfig { + childhood: Duration, + retries: u64, + count: u64, + bind_fds: Vec, + prog: Command, + //XXX:rpc_ask: Sender, + //XXX:rpc_reply: Receiver>, +} -fn run(binds: Vec, mut prog: Command, number: u64) { +enum OffspringState { + Expectant, // no process exist yet + Infancy, // just started, waiting for ACK + Healthy, + Sick, + Notified, // shutting down + Dead, +} - prog.env("EINHORN_FD_COUNT", binds.len().to_string()); - // This iterator destroys the TcpListeners - for (i, b) in binds.into_iter().enumerate() { - let orig_fd = b.into_raw_fd(); - // Duplicate, which also clears the CLOEXEC flag - //let fd = nix::fcntl::fcntl(nix::fcntl::FcntlArg::F_DUPFD(orig_fd)).unwrap(); - let fd = nix::unistd::dup(orig_fd).unwrap(); - println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap()); - prog.env(format!("EINHORN_FD_{}", i), fd.to_string()); - // NB: is fd getting destroyed here? +struct Offspring { + state: OffspringState, + process: Option, + birthday: Instant, // specifies the generation + attempts: u64, +} + +// This is the main event loop +fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { + + // birth the initial set of offspring + + // create timer + let timer = timer::Timer::new(); + // XXX: these signatures are bogus + let (timer_tx, timer_rx): (Sender, Receiver) = chan::async(); + + // infinite select() loop over timers, signals, rpc + + loop { + chan_select! { + timer_rx.recv() => println!("Timer tick'd"), + signal_rx.recv() => { + println!("Signal received!"); + break; + } + } } let mut children: Vec = vec![]; - for _ in 0..number { + for _ in 0..cfg.count { println!("Running!"); - children.push(prog.spawn().expect("error spawning")); + children.push(cfg.prog.spawn().expect("error spawning")); } println!("Waiting for all children to die"); @@ -62,25 +103,6 @@ fn run(binds: Vec, mut prog: Command, number: u64) { println!("Done."); } -static mut interrupted: bool = false; - -extern fn handle_hup(_: i32) { - println!("This is where I would restart all children gracefully?"); -} - -extern fn handle_int(_: i32) { - let first = unsafe { - let tmp = !interrupted; - interrupted = true; - tmp - }; - if first { - println!("Waiting for childred to shutdown gracefully (Ctrl-C again to bail)"); - } else { - panic!(); - } -} - fn print_usage(opts: Options) { let brief = "usage:\teinhyrningsins [options] program"; println!(""); @@ -108,14 +130,22 @@ fn main() { return; } - let number: u64 = match matches.opt_str("number") { + //// Parse Configuration + let mut cfg = EinConfig{ + count: 1, + childhood: Duration::new(3, 0), + retries: 3, + bind_fds: vec![], + prog: Command::new(""), + }; + + cfg.count = match matches.opt_str("number") { Some(n) => u64::from_str(&n).expect("number arg should be an integer"), - None => 1 + None => 1 // XXX: duplicate default }; + //// Bind Sockets let sock_addrs: Vec = matches.opt_strs("bind").iter().map(|b| { - //let sa: SocketAddr = b.to_socket_addrs().unwrap().next().unwrap(); - //sa b.to_socket_addrs().unwrap().next().unwrap() }).collect(); @@ -133,30 +163,6 @@ fn main() { } builder.init().unwrap(); - println!("Registering signal handlers..."); - let mut mask_hup = signal::SigSet::empty(); - mask_hup.add(signal::Signal::SIGHUP); - mask_hup.add(signal::Signal::SIGUSR2); - let hup_action = signal::SigAction::new( - signal::SigHandler::Handler(handle_hup), - signal::SaFlags::empty(), - mask_hup); - - let mut mask_int = signal::SigSet::empty(); - mask_int.add(signal::Signal::SIGINT); - mask_int.add(signal::Signal::SIGTERM); - let int_action = signal::SigAction::new( - signal::SigHandler::Handler(handle_int), - signal::SaFlags::empty(), - mask_int); - - unsafe { - signal::sigaction(signal::Signal::SIGHUP, &hup_action).unwrap(); - signal::sigaction(signal::Signal::SIGUSR2, &hup_action).unwrap(); - signal::sigaction(signal::Signal::SIGINT, &int_action).unwrap(); - signal::sigaction(signal::Signal::SIGTERM, &int_action).unwrap(); - } - let binds: Vec = sock_addrs.iter().map(|sa| { TcpListener::bind(sa).unwrap() }).collect(); @@ -164,6 +170,29 @@ fn main() { let mut prog = Command::new(&program_and_args[0]); prog.args(&program_and_args[1..]); - run(binds, prog, number); + cfg.bind_fds = binds.into_iter().map(|b| { + let orig_fd = b.into_raw_fd(); + // Duplicate, which also clears the CLOEXEC flag + let fd = nix::unistd::dup(orig_fd).unwrap(); + println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap()); + fd + }).collect(); + + prog.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string()); + // This iterator destroys the TcpListeners + for (i, fd) in cfg.bind_fds.iter().enumerate() { + prog.env(format!("EINHORN_FD_{}", i), fd.to_string()); + } + cfg.prog = prog; + + //// Listen for signals (before any fork()) + println!("Registering signal handlers..."); + let signal_rx = chan_signal::notify(&[Signal::INT, + Signal::TERM, + //Signal::CHLD, // XXX: PR has been submitted + Signal::USR2, + Signal::HUP]); + + shepard(cfg, signal_rx); exit(0); } -- cgit v1.2.3