aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbnewbold <bnewbold@robocracy.org>2016-10-09 00:46:59 -0700
committerbnewbold <bnewbold@robocracy.org>2016-10-09 00:46:59 -0700
commit44bebada4207fd7fbe133eea834ed255de3f14ae (patch)
tree9445e481d72b02a7200c176158aedbd14fc4fac3
parent639fc1f0bd0be25c25218e8e7ed2e90e3dc5dcfb (diff)
downloadeinhyrningsins-44bebada4207fd7fbe133eea834ed255de3f14ae.tar.gz
einhyrningsins-44bebada4207fd7fbe133eea834ed255de3f14ae.zip
start refactoring to chan-signals
-rw-r--r--Cargo.lock108
-rw-r--r--Cargo.toml4
-rw-r--r--src/main.rs155
3 files changed, 204 insertions, 63 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 3a0e669..d04004f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2,10 +2,14 @@
name = "einhyrningsins"
version = "0.1.0"
dependencies = [
+ "chan 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "chan-signal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "timer 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -17,6 +21,19 @@ dependencies = [
]
[[package]]
+name = "bit-set"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "bit-vec"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "bitflags"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -27,6 +44,34 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "chan"
+version = "0.1.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "chan-signal"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bit-set 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "chan 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "chrono"
+version = "0.2.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "num 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
+ "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "env_logger"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -50,6 +95,11 @@ dependencies = [
]
[[package]]
+name = "lazy_static"
+version = "0.1.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "libc"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -81,6 +131,46 @@ dependencies = [
]
[[package]]
+name = "num"
+version = "0.1.36"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-iter 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "num-integer"
+version = "0.1.32"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "num-iter"
+version = "0.1.32"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "num-integer 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "num-traits"
+version = "0.1.36"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "rand"
+version = "0.3.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "regex"
version = "0.1.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -128,6 +218,24 @@ dependencies = [
]
[[package]]
+name = "time"
+version = "0.1.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "timer"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "utf8-ranges"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index be7fd7f..4fcdd2c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,3 +8,7 @@ nix = "0.7"
log = "0.3"
env_logger = "0.3"
getopts = "^0.2"
+timer = "0.1"
+chrono = "0.2"
+chan = "0.1"
+chan-signal = "0.1"
diff --git a/src/main.rs b/src/main.rs
index b7dee25..35968ea 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -16,10 +16,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#[macro_use]
+extern crate chan;
+
extern crate getopts;
extern crate log;
extern crate env_logger;
extern crate nix;
+extern crate timer;
+extern crate chan_signal;
use std::env;
use std::u64;
@@ -30,29 +35,65 @@ use std::process::Child;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::ToSocketAddrs;
+use std::time::{Duration, Instant};
use getopts::Options;
-use std::os::unix::io::IntoRawFd;
-use nix::sys::signal;
+use chan_signal::Signal;
+use chan::{Sender, Receiver};
+use std::os::unix::io::{RawFd, IntoRawFd};
+
+struct EinConfig {
+ childhood: Duration,
+ retries: u64,
+ count: u64,
+ bind_fds: Vec<RawFd>,
+ prog: Command,
+ //XXX:rpc_ask: Sender<String>,
+ //XXX:rpc_reply: Receiver<Result<String, String>>,
+}
-fn run(binds: Vec<TcpListener>, mut prog: Command, number: u64) {
+enum OffspringState {
+ Expectant, // no process exist yet
+ Infancy, // just started, waiting for ACK
+ Healthy,
+ Sick,
+ Notified, // shutting down
+ Dead,
+}
- prog.env("EINHORN_FD_COUNT", binds.len().to_string());
- // This iterator destroys the TcpListeners
- for (i, b) in binds.into_iter().enumerate() {
- let orig_fd = b.into_raw_fd();
- // Duplicate, which also clears the CLOEXEC flag
- //let fd = nix::fcntl::fcntl(nix::fcntl::FcntlArg::F_DUPFD(orig_fd)).unwrap();
- let fd = nix::unistd::dup(orig_fd).unwrap();
- println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap());
- prog.env(format!("EINHORN_FD_{}", i), fd.to_string());
- // NB: is fd getting destroyed here?
+struct Offspring {
+ state: OffspringState,
+ process: Option<Child>,
+ birthday: Instant, // specifies the generation
+ attempts: u64,
+}
+
+// This is the main event loop
+fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
+
+ // birth the initial set of offspring
+
+ // create timer
+ let timer = timer::Timer::new();
+ // XXX: these signatures are bogus
+ let (timer_tx, timer_rx): (Sender<u64>, Receiver<u64>) = chan::async();
+
+ // infinite select() loop over timers, signals, rpc
+
+ loop {
+ chan_select! {
+ timer_rx.recv() => println!("Timer tick'd"),
+ signal_rx.recv() => {
+ println!("Signal received!");
+ break;
+ }
+ }
}
let mut children: Vec<Child> = vec![];
- for _ in 0..number {
+ for _ in 0..cfg.count {
println!("Running!");
- children.push(prog.spawn().expect("error spawning"));
+ children.push(cfg.prog.spawn().expect("error spawning"));
}
println!("Waiting for all children to die");
@@ -62,25 +103,6 @@ fn run(binds: Vec<TcpListener>, mut prog: Command, number: u64) {
println!("Done.");
}
-static mut interrupted: bool = false;
-
-extern fn handle_hup(_: i32) {
- println!("This is where I would restart all children gracefully?");
-}
-
-extern fn handle_int(_: i32) {
- let first = unsafe {
- let tmp = !interrupted;
- interrupted = true;
- tmp
- };
- if first {
- println!("Waiting for childred to shutdown gracefully (Ctrl-C again to bail)");
- } else {
- panic!();
- }
-}
-
fn print_usage(opts: Options) {
let brief = "usage:\teinhyrningsins [options] program";
println!("");
@@ -108,14 +130,22 @@ fn main() {
return;
}
- let number: u64 = match matches.opt_str("number") {
+ //// Parse Configuration
+ let mut cfg = EinConfig{
+ count: 1,
+ childhood: Duration::new(3, 0),
+ retries: 3,
+ bind_fds: vec![],
+ prog: Command::new(""),
+ };
+
+ cfg.count = match matches.opt_str("number") {
Some(n) => u64::from_str(&n).expect("number arg should be an integer"),
- None => 1
+ None => 1 // XXX: duplicate default
};
+ //// Bind Sockets
let sock_addrs: Vec<SocketAddr> = matches.opt_strs("bind").iter().map(|b| {
- //let sa: SocketAddr = b.to_socket_addrs().unwrap().next().unwrap();
- //sa
b.to_socket_addrs().unwrap().next().unwrap()
}).collect();
@@ -133,30 +163,6 @@ fn main() {
}
builder.init().unwrap();
- println!("Registering signal handlers...");
- let mut mask_hup = signal::SigSet::empty();
- mask_hup.add(signal::Signal::SIGHUP);
- mask_hup.add(signal::Signal::SIGUSR2);
- let hup_action = signal::SigAction::new(
- signal::SigHandler::Handler(handle_hup),
- signal::SaFlags::empty(),
- mask_hup);
-
- let mut mask_int = signal::SigSet::empty();
- mask_int.add(signal::Signal::SIGINT);
- mask_int.add(signal::Signal::SIGTERM);
- let int_action = signal::SigAction::new(
- signal::SigHandler::Handler(handle_int),
- signal::SaFlags::empty(),
- mask_int);
-
- unsafe {
- signal::sigaction(signal::Signal::SIGHUP, &hup_action).unwrap();
- signal::sigaction(signal::Signal::SIGUSR2, &hup_action).unwrap();
- signal::sigaction(signal::Signal::SIGINT, &int_action).unwrap();
- signal::sigaction(signal::Signal::SIGTERM, &int_action).unwrap();
- }
-
let binds: Vec<TcpListener> = sock_addrs.iter().map(|sa| {
TcpListener::bind(sa).unwrap()
}).collect();
@@ -164,6 +170,29 @@ fn main() {
let mut prog = Command::new(&program_and_args[0]);
prog.args(&program_and_args[1..]);
- run(binds, prog, number);
+ cfg.bind_fds = binds.into_iter().map(|b| {
+ let orig_fd = b.into_raw_fd();
+ // Duplicate, which also clears the CLOEXEC flag
+ let fd = nix::unistd::dup(orig_fd).unwrap();
+ println!("fd={} FD_CLOEXEC={}", fd, nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap());
+ fd
+ }).collect();
+
+ prog.env("EINHORN_FD_COUNT", cfg.bind_fds.len().to_string());
+ // This iterator destroys the TcpListeners
+ for (i, fd) in cfg.bind_fds.iter().enumerate() {
+ prog.env(format!("EINHORN_FD_{}", i), fd.to_string());
+ }
+ cfg.prog = prog;
+
+ //// Listen for signals (before any fork())
+ println!("Registering signal handlers...");
+ let signal_rx = chan_signal::notify(&[Signal::INT,
+ Signal::TERM,
+ //Signal::CHLD, // XXX: PR has been submitted
+ Signal::USR2,
+ Signal::HUP]);
+
+ shepard(cfg, signal_rx);
exit(0);
}