aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/einhyrningsinsctl.rs2
-rw-r--r--src/main.rs162
2 files changed, 98 insertions, 66 deletions
diff --git a/src/bin/einhyrningsinsctl.rs b/src/bin/einhyrningsinsctl.rs
index 527eb61..20981f8 100644
--- a/src/bin/einhyrningsinsctl.rs
+++ b/src/bin/einhyrningsinsctl.rs
@@ -194,7 +194,7 @@ fn main() {
Ok(reply) => println!("{}", reply),
Err(e) => println!("Communications error: {}", e),
}
- }
+ }
None => shell(ctrl_stream),
}
exit(0);
diff --git a/src/main.rs b/src/main.rs
index ca6e033..f2a0b2a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -218,7 +218,7 @@ impl Offspring {
// * * * * * * * Main Event Loop * * * * * * *
fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
- //// birth the initial set of offspring
+ /// birth the initial set of offspring
let mut brood: HashMap<u32, Offspring> = HashMap::new();
for _ in 0..state.cfg.count {
let o = Offspring::spawn(&mut state).unwrap();
@@ -230,7 +230,7 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
let ctrl_req_rx = state.ctrl_req_rx.clone();
let timer_rx = state.timer_rx.clone();
- //// infinite select() loop over timers, signals
+ /// infinite select() loop over timers, signals
let mut run = true;
loop {
chan_select! {
@@ -289,7 +289,9 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
let o = Offspring::spawn(&mut state).unwrap();
let pid = o.process.id();
brood.insert(pid, o);
- req.tx.send(format!("Spawned! Went from {} to {}", state.cfg.count, state.cfg.count+1));
+ req.tx.send(format!("Spawned! Went from {} to {}",
+ state.cfg.count,
+ state.cfg.count+1));
state.cfg.count += 1;
},
CtrlAction::Decrement => {
@@ -301,7 +303,9 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
for (_, o) in &mut brood {
if o.is_active() {
o.shutdown(&mut state);
- req.tx.send(format!("Notified! Went from {} to {}", state.cfg.count, state.cfg.count-1));
+ req.tx.send(format!("Notified! Went from {} to {}",
+ state.cfg.count,
+ state.cfg.count-1));
state.cfg.count -= 1;
done = true;
break;
@@ -512,7 +516,7 @@ fn main() {
exit(-1);
}
- //// Parse Configuration
+ /// Parse Configuration
let path_str = matches.opt_str("socket-path").unwrap_or("/tmp/einhorn.sock".to_string());
let count = match matches.opt_str("number") {
@@ -571,7 +575,7 @@ fn main() {
let (ctrl_req_tx, ctrl_req_rx): (Sender<CtrlRequest>, Receiver<CtrlRequest>) = chan::async();
- //// Listen for signals (before any fork())
+ /// Listen for signals (before any fork())
println!("Registering signal handlers...");
let signal_rx = chan_signal::notify(&[Signal::HUP,
Signal::INT,
@@ -595,11 +599,11 @@ fn main() {
}
};
- //// Start Constrol Socket Thread
+ /// Start Constrol Socket Thread
let ctrl_log = state.log.clone();
thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx, ctrl_log));
- //// State Event Loop
+ /// State Event Loop
shepard(state, signal_rx);
exit(0);
}
@@ -607,51 +611,73 @@ fn main() {
// Initializes config into state
fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, String> {
- //// Configure logging
- let term_drain = slog::level_filter(
- if cfg.verbose { slog::Level::Debug } else { slog::Level::Info },
- slog_term::streamer().async().auto_color().compact().build());
- let syslog_drain = slog::level_filter(
- slog::Level::Warning,
- slog_syslog::unix_3164(slog_syslog::Facility::LOG_DAEMON));
+ /// Configure logging
+ let term_drain =
+ slog::level_filter(if cfg.verbose {
+ slog::Level::Debug
+ } else {
+ slog::Level::Info
+ },
+ slog_term::streamer().async().auto_color().compact().build());
+ let syslog_drain =
+ slog::level_filter(slog::Level::Warning,
+ slog_syslog::unix_3164(slog_syslog::Facility::LOG_DAEMON));
// XXX: cfg.syslog
- let log_root = slog::Logger::root(
- slog::duplicate(term_drain, syslog_drain).ignore_err(),
- o!("version" => env!("CARGO_PKG_VERSION")));
+ let log_root = slog::Logger::root(slog::duplicate(term_drain, syslog_drain).ignore_err(),
+ o!("version" => env!("CARGO_PKG_VERSION")));
// These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK)
- let sock_confs: Vec<(SocketAddr, bool, bool)> = cfg.bind_slugs.iter().map(|b| {
- let mut r = false;
- let mut n = false;
- let mut addr_chunks = b.split(',');
- let sock_str = addr_chunks.next().unwrap(); // safe
- let mut sock_addrs = sock_str.to_socket_addrs().unwrap();
- // ugly
- let sock = if cfg.ipv4_only {
- let mut sock_addrs = sock_addrs.filter(
- |sa| if let SocketAddr::V4(_) = *sa { true } else { false });
- sock_addrs.next().expect("Couldn't bind as IPv4")
- } else if cfg.ipv6_only {
- let mut sock_addrs = sock_addrs.filter(
- |sa| if let SocketAddr::V6(_) = *sa { true } else { false });
- sock_addrs.next().expect("Couldn't bind as IPv6")
- } else {
- sock_addrs.next().expect("Couldn't bind socket")
- };
- for subarg in addr_chunks { match subarg {
- "r" => r = true,
- "n" => n = true,
- "" => (),
- _ => { println!("Unknown socket arg '{}', I only know about 'n' and 'r'. Try --help", subarg);
- exit(-1); },
- }}
- (sock, r, n)
- }).collect();
-
- let binds: Vec<(TcpListener, bool, bool)> = sock_confs.iter().map(|t| {
- let sa = t.0; let r = t.1; let n = t.2; // ugly
- (TcpListener::bind(sa).unwrap(), r, n)
- }).collect();
+ let sock_confs: Vec<(SocketAddr, bool, bool)> = cfg.bind_slugs
+ .iter()
+ .map(|b| {
+ let mut r = false;
+ let mut n = false;
+ let mut addr_chunks = b.split(',');
+ let sock_str = addr_chunks.next().unwrap(); // safe
+ let mut sock_addrs = sock_str.to_socket_addrs().unwrap();
+ // ugly
+ let sock = if cfg.ipv4_only {
+ let mut sock_addrs = sock_addrs.filter(|sa| if let SocketAddr::V4(_) = *sa {
+ true
+ } else {
+ false
+ });
+ sock_addrs.next().expect("Couldn't bind as IPv4")
+ } else if cfg.ipv6_only {
+ let mut sock_addrs = sock_addrs.filter(|sa| if let SocketAddr::V6(_) = *sa {
+ true
+ } else {
+ false
+ });
+ sock_addrs.next().expect("Couldn't bind as IPv6")
+ } else {
+ sock_addrs.next().expect("Couldn't bind socket")
+ };
+ for subarg in addr_chunks {
+ match subarg {
+ "r" => r = true,
+ "n" => n = true,
+ "" => (),
+ _ => {
+ println!("Unknown socket arg '{}', I only know about 'n' and 'r'. Try \
+ --help",
+ subarg);
+ exit(-1);
+ }
+ }
+ }
+ (sock, r, n)
+ })
+ .collect();
+
+ let binds: Vec<(TcpListener, bool, bool)> = sock_confs.iter()
+ .map(|t| {
+ let sa = t.0;
+ let r = t.1;
+ let n = t.2; // ugly
+ (TcpListener::bind(sa).unwrap(), r, n)
+ })
+ .collect();
let mut cmd = Command::new(cfg.program.clone());
cmd.args(&cfg.program_args);
@@ -659,22 +685,28 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState,
cmd.env_remove(var);
}
- let bind_fds: Vec<RawFd> = binds.into_iter().map(|t| {
- let b = t.0; let r = t.1; let n = t.2; // ugly
- let orig_fd = b.into_raw_fd();
- // Duplicate, which also clears the CLOEXEC flag
- let fd = nix::unistd::dup(orig_fd).unwrap();
- if r {
- nix::sys::socket::setsockopt(fd, nix::sys::socket::sockopt::ReuseAddr, &true).unwrap();
- }
- if n {
- nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::O_NONBLOCK)).unwrap();
- }
- debug!(log_root, "bound socket";
- "fd" => fd,
+ let bind_fds: Vec<RawFd> = binds.into_iter()
+ .map(|t| {
+ let b = t.0;
+ let r = t.1;
+ let n = t.2; // ugly
+ let orig_fd = b.into_raw_fd();
+ // Duplicate, which also clears the CLOEXEC flag
+ let fd = nix::unistd::dup(orig_fd).unwrap();
+ if r {
+ nix::sys::socket::setsockopt(fd, nix::sys::socket::sockopt::ReuseAddr, &true)
+ .unwrap();
+ }
+ if n {
+ nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::O_NONBLOCK))
+ .unwrap();
+ }
+ debug!(log_root, "bound socket";
+ "fd" => fd,
"FD_CLOEXEC" => nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_GETFD).unwrap());
- fd
- }).collect();
+ fd
+ })
+ .collect();
cmd.env("EINHORN_FD_COUNT", bind_fds.len().to_string());
// This iterator destroys the TcpListeners