aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs112
1 files changed, 105 insertions, 7 deletions
diff --git a/src/main.rs b/src/main.rs
index 4e994c3..6a96af1 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -27,6 +27,8 @@ extern crate timer;
extern crate time;
extern crate chan_signal;
+use std::io::prelude::*;
+use std::io::{BufReader, BufWriter};
use std::env;
use std::u64;
use std::str::FromStr;
@@ -36,7 +38,8 @@ use std::process::Child;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::ToSocketAddrs;
-//use std::time::Instant;
+use std::os::unix::net::{UnixStream, UnixListener};
+use std::thread;
use time::Duration;
use std::collections::HashMap;
use getopts::Options;
@@ -45,6 +48,7 @@ use chan_signal::Signal;
use chan::{Sender, Receiver};
use std::os::unix::io::{RawFd, IntoRawFd};
+// TODO: split this into read-only config and mutable state
struct EinConfig {
childhood: Duration,
graceperiod: Duration,
@@ -55,6 +59,7 @@ struct EinConfig {
cmd: Command,
ipv4_only: bool,
ipv6_only: bool,
+ ctrl_req_rx: Receiver<CtrlRequest>,
}
#[derive(Copy, Clone, Debug, PartialEq)]
@@ -68,7 +73,6 @@ enum OffspringState {
struct Offspring {
state: OffspringState,
process: Child,
- // birthday: Instant, // specifies the generation
attempts: u64,
timer_guard: Option<timer::Guard>,
replaces: Option<u32>,
@@ -80,7 +84,6 @@ impl Offspring {
let mut o = Offspring {
state: OffspringState::Infancy,
process: cfg.cmd.spawn().expect("error spawning"),
- // birthday: Instant::now(),
attempts: 0,
timer_guard: None,
replaces: None,
@@ -138,10 +141,11 @@ impl Offspring {
return;
}
let nix_sig = match sig {
- Signal::HUP => nix::sys::signal::Signal::SIGHUP,
- Signal::INT => nix::sys::signal::Signal::SIGINT,
- Signal::TERM => nix::sys::signal::Signal::SIGTERM,
+ Signal::HUP => nix::sys::signal::Signal::SIGHUP,
+ Signal::INT => nix::sys::signal::Signal::SIGINT,
+ Signal::TERM => nix::sys::signal::Signal::SIGTERM,
Signal::KILL => nix::sys::signal::Signal::SIGKILL,
+ Signal::USR2 => nix::sys::signal::Signal::SIGUSR2,
_ => { println!("Unexpected signal: {:?}", sig); return; },
};
nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap();
@@ -155,6 +159,22 @@ enum TimerAction {
CheckShutdown(u32),
}
+#[derive(Copy, Clone, Debug, PartialEq)]
+enum CtrlAction {
+ Increment,
+ Decrement,
+ SigAll(Signal),
+ ShutdownAll,
+ UpgradeAll,
+ Status,
+}
+
+#[derive(Clone, Debug, PartialEq)]
+struct CtrlRequest {
+ action: CtrlAction,
+ tx: Sender<String>,
+}
+
// This is the main event loop
fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
@@ -171,6 +191,9 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
println!("Spawned: {}", pid);
}
+ // Ugh, see: http://burntsushi.net/rustdoc/chan/macro.chan_select.html#failure-modes
+ let ctrl_req_rx = cfg.ctrl_req_rx.clone();
+
//// infinite select() loop over timers, signals
let mut run = true;
loop {
@@ -221,6 +244,38 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver<Signal>) {
}
},
},
+ ctrl_req_rx.recv() -> maybe_req =>
+ if let Some(req) = maybe_req { match req.action {
+ CtrlAction::Increment => {
+ req.tx.send(format!("UNIMPLEMENTED"));
+ },
+ CtrlAction::Decrement => {
+ req.tx.send(format!("UNIMPLEMENTED"));
+ },
+ CtrlAction::SigAll(sig) => {
+ for (_, o) in brood.iter_mut() {
+ o.signal(sig);
+ }
+ req.tx.send(format!("UNIMPLEMENTED"));
+ },
+ CtrlAction::ShutdownAll => {
+ let mut pid_list = vec![];
+ for (pid, o) in brood.iter_mut() {
+ if o.is_active() {
+ o.shutdown(&mut cfg, &mut timer, timer_tx.clone());
+ pid_list.push(pid);
+ }
+ }
+ req.tx.send(format!("UNIMPLEMENTED"));
+ },
+ CtrlAction::UpgradeAll => {
+ req.tx.send(format!("UNIMPLEMENTED"));
+ },
+ CtrlAction::Status => {
+ req.tx.send(format!("UNIMPLEMENTED"));
+ },
+ }
+ },
signal_rx.recv() -> sig => match sig.expect("Error with signal handler") {
Signal::CHLD => {
loop {
@@ -326,6 +381,7 @@ fn main() {
}
//// Parse Configuration
+ let (ctrl_req_tx, ctrl_req_rx): (Sender<CtrlRequest>, Receiver<CtrlRequest>) = chan::async();
let mut cfg = EinConfig{
count: 1,
childhood: Duration::seconds(3),
@@ -336,6 +392,7 @@ fn main() {
ipv6_only: matches.opt_present("6"),
manual_ack: matches.opt_present("m"),
cmd: Command::new(""),
+ ctrl_req_rx: ctrl_req_rx,
};
if let Some(n) = matches.opt_str("number") {
@@ -343,6 +400,11 @@ fn main() {
}
//// Bind Sockets
+
+ // Control socket first
+ let ctrl_listener = UnixListener::bind("/tmp/einhorn.sock").unwrap();
+ // XXX: set mode/permissions/owner?
+
// These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK)
let sock_confs: Vec<(SocketAddr, bool, bool)> = matches.opt_strs("bind").iter().map(|b| {
let mut r = false;
@@ -418,12 +480,48 @@ fn main() {
//// Listen for signals (before any fork())
println!("Registering signal handlers...");
+ // TODO: Should mask others here? START, etc?
let signal_rx = chan_signal::notify(&[Signal::INT,
Signal::TERM,
- Signal::CHLD, // XXX: PR has been submitted
+ Signal::CHLD, // NB: PR has been submitted
Signal::USR2,
Signal::HUP]);
+ //// Start Constrol Socket Thread
+ thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx));
+
shepard(cfg, signal_rx);
exit(0);
}
+
+fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>) {
+ let reader = BufReader::new(&stream);
+ let mut writer = BufWriter::new(&stream);
+ for line in reader.lines() {
+ let line = line.unwrap();
+ println!("Got message: {}", line);
+ let (tx, rx): (Sender<String>, Receiver<String>) = chan::async();
+ let req = CtrlRequest{ action: CtrlAction::Status, tx: tx };
+ ctrl_req_tx.send(req);
+ let resp = rx.recv().unwrap();
+ writer.write_all(resp.as_bytes()).unwrap();
+ }
+ stream.shutdown(std::net::Shutdown::Both).unwrap();
+}
+
+fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender<CtrlRequest>) {
+ for conn in listener.incoming() {
+ match conn{
+ Ok(conn) => {
+ let tx = ctrl_req_tx.clone();
+ thread::spawn(move || ctrl_socket_handle(conn, tx));
+ },
+ Err(err) => {
+ // TODO
+ println!("control socket err: {}", err);
+ break;
+ },
+ }
+ }
+ drop(listener);
+}