From 44bebada4207fd7fbe133eea834ed255de3f14ae Mon Sep 17 00:00:00 2001 From: bnewbold Date: Sun, 9 Oct 2016 00:46:59 -0700 Subject: start refactoring to chan-signals --- src/main.rs | 155 ++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 92 insertions(+), 63 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index b7dee25..35968ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,10 +16,15 @@ * along with this program. If not, see . */ +#[macro_use] +extern crate chan; + extern crate getopts; extern crate log; extern crate env_logger; extern crate nix; +extern crate timer; +extern crate chan_signal; use std::env; use std::u64; @@ -30,29 +35,65 @@ use std::process::Child; use std::net::SocketAddr; use std::net::TcpListener; use std::net::ToSocketAddrs; +use std::time::{Duration, Instant}; use getopts::Options; -use std::os::unix::io::IntoRawFd; -use nix::sys::signal; +use chan_signal::Signal; +use chan::{Sender, Receiver}; +use std::os::unix::io::{RawFd, IntoRawFd}; + +struct EinConfig { + childhood: Duration, + retries: u64, + count: u64, + bind_fds: Vec, + prog: Command, + //XXX:rpc_ask: Sender, + //XXX:rpc_reply: Receiver>, +} -fn run(binds: Vec, mut prog: Command, number: u64) { +enum OffspringState { + Expectant, // no process exist yet + Infancy, // just started, waiting for ACK + Healthy, + Sick, + Notified, // shutting down + Dead, +} - prog.env("EINHORN_FD_COUNT", binds.len().to_string()); - // This iterator destroys the TcpListeners - for (i, b) in binds.into_iter().enumerate() { - let orig_fd = b.into_raw_fd(); - // Duplicate, which also clears the CLOEXEC flag - //let fd = nix::fcntl::fcntl(nix::fcntl::FcntlArg::F_DUPFD(orig_fd)).unwrap(); - let fd = nix::unistd::dup(orig_fd).unwrap(); - println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap()); - prog.env(format!("EINHORN_FD_{}", i), fd.to_string()); - // NB: is fd getting destroyed here? +struct Offspring { + state: OffspringState, + process: Option, + birthday: Instant, // specifies the generation + attempts: u64, +} + +// This is the main event loop +fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { + + // birth the initial set of offspring + + // create timer + let timer = timer::Timer::new(); + // XXX: these signatures are bogus + let (timer_tx, timer_rx): (Sender, Receiver) = chan::async(); + + // infinite select() loop over timers, signals, rpc + + loop { + chan_select! { + timer_rx.recv() => println!("Timer tick'd"), + signal_rx.recv() => { + println!("Signal received!"); + break; + } + } } let mut children: Vec = vec![]; - for _ in 0..number { + for _ in 0..cfg.count { println!("Running!"); - children.push(prog.spawn().expect("error spawning")); + children.push(cfg.prog.spawn().expect("error spawning")); } println!("Waiting for all children to die"); @@ -62,25 +103,6 @@ fn run(binds: Vec, mut prog: Command, number: u64) { println!("Done."); } -static mut interrupted: bool = false; - -extern fn handle_hup(_: i32) { - println!("This is where I would restart all children gracefully?"); -} - -extern fn handle_int(_: i32) { - let first = unsafe { - let tmp = !interrupted; - interrupted = true; - tmp - }; - if first { - println!("Waiting for childred to shutdown gracefully (Ctrl-C again to bail)"); - } else { - panic!(); - } -} - fn print_usage(opts: Options) { let brief = "usage:\teinhyrningsins [options] program"; println!(""); @@ -108,14 +130,22 @@ fn main() { return; } - let number: u64 = match matches.opt_str("number") { + //// Parse Configuration + let mut cfg = EinConfig{ + count: 1, + childhood: Duration::new(3, 0), + retries: 3, + bind_fds: vec![], + prog: Command::new(""), + }; + + cfg.count = match matches.opt_str("number") { Some(n) => u64::from_str(&n).expect("number arg should be an integer"), - None => 1 + None => 1 // XXX: duplicate default }; + //// Bind Sockets let sock_addrs: Vec = matches.opt_strs("bind").iter().map(|b| { - //let sa: SocketAddr = b.to_socket_addrs().unwrap().next().unwrap(); - //sa b.to_socket_addrs().unwrap().next().unwrap() }).collect(); @@ -133,30 +163,6 @@ fn main() { } builder.init().unwrap(); - println!("Registering signal handlers..."); - let mut mask_hup = signal::SigSet::empty(); - mask_hup.add(signal::Signal::SIGHUP); - mask_hup.add(signal::Signal::SIGUSR2); - let hup_action = signal::SigAction::new( - signal::SigHandler::Handler(handle_hup), - signal::SaFlags::empty(), - mask_hup); - - let mut mask_int = signal::SigSet::empty(); - mask_int.add(signal::Signal::SIGINT); - mask_int.add(signal::Signal::SIGTERM); - let int_action = signal::SigAction::new( - signal::SigHandler::Handler(handle_int), - signal::SaFlags::empty(), - mask_int); - - unsafe { - signal::sigaction(signal::Signal::SIGHUP, &hup_action).unwrap(); - signal::sigaction(signal::Signal::SIGUSR2, &hup_action).unwrap(); - signal::sigaction(signal::Signal::SIGINT, &int_action).unwrap(); - signal::sigaction(signal::Signal::SIGTERM, &int_action).unwrap(); - } - let binds: Vec = sock_addrs.iter().map(|sa| { TcpListener::bind(sa).unwrap() }).collect(); @@ -164,6 +170,29 @@ fn main() { let mut prog = Command::new(&program_and_args[0]); prog.args(&program_and_args[1..]); - run(binds, prog, number); + cfg.bind_fds = binds.into_iter().map(|b| { + let orig_fd = b.into_raw_fd(); + // Duplicate, which also clears the CLOEXEC flag + let fd = nix::unistd::dup(orig_fd).unwrap(); + println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap()); + fd + }).collect(); + + prog.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string()); + // This iterator destroys the TcpListeners + for (i, fd) in cfg.bind_fds.iter().enumerate() { + prog.env(format!("EINHORN_FD_{}", i), fd.to_string()); + } + cfg.prog = prog; + + //// Listen for signals (before any fork()) + println!("Registering signal handlers..."); + let signal_rx = chan_signal::notify(&[Signal::INT, + Signal::TERM, + //Signal::CHLD, // XXX: PR has been submitted + Signal::USR2, + Signal::HUP]); + + shepard(cfg, signal_rx); exit(0); } -- cgit v1.2.3