aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/einhyrningsinsctl.rs102
-rw-r--r--src/main.rs126
2 files changed, 131 insertions, 97 deletions
diff --git a/src/bin/einhyrningsinsctl.rs b/src/bin/einhyrningsinsctl.rs
index a3ba8f0..527eb61 100644
--- a/src/bin/einhyrningsinsctl.rs
+++ b/src/bin/einhyrningsinsctl.rs
@@ -1,20 +1,19 @@
-/*
- * einhyrningsinsctl: controller/shell for einhyrningsins
- * Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org>
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
+// einhyrningsinsctl: controller/shell for einhyrningsins
+// Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org>
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
#[macro_use]
extern crate json;
@@ -57,34 +56,43 @@ fn shell(ctrl_stream: UnixStream) {
match readline {
Ok(line) => {
rl.add_history_entry(&line);
- if line.is_empty() { continue; };
+ if line.is_empty() {
+ continue;
+ };
let mut chunks = line.split(' ');
let cmd = chunks.nth(0).unwrap();
let args = chunks.collect();
match send_msg(&mut reader, &mut writer, cmd, args) {
- Ok(s) => { println!("{}", s); },
+ Ok(s) => {
+ println!("{}", s);
+ }
Err(e) => {
println!("Error sending control message: {}", e);
exit(-1);
- },
+ }
}
- },
- Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
+ }
+ Err(ReadlineError::Interrupted) |
+ Err(ReadlineError::Eof) => {
println!("Caught kill signal (shutting down)");
- break
- },
+ break;
+ }
Err(err) => {
println!("Shell Error: {:?} (shutting down)", err);
- break
+ break;
}
}
}
- //drop(ctrl_stream);
+ // drop(ctrl_stream);
}
// This function sends a single request message down the writer, then waits for a reply on the
// reader and prints the result.
-fn send_msg(reader: &mut BufRead, writer: &mut Write, cmd: &str, args: Vec<&str>) -> io::Result<String> {
+fn send_msg(reader: &mut BufRead,
+ writer: &mut Write,
+ cmd: &str,
+ args: Vec<&str>)
+ -> io::Result<String> {
let mut buffer = String::new();
let mut arg_list = json::JsonValue::new_array();
@@ -96,15 +104,15 @@ fn send_msg(reader: &mut BufRead, writer: &mut Write, cmd: &str, args: Vec<&str>
"command" => cmd,
"args" => arg_list
};
- //println!("Sending: {}", req.dump());
+ // println!("Sending: {}", req.dump());
try!(writer.write_all(format!("{}\n", req.dump()).as_bytes()));
try!(writer.flush());
try!(reader.read_line(&mut buffer));
- //println!("Got: {}", buffer);
+ // println!("Got: {}", buffer);
let reply = match json::parse(&buffer) {
Ok(obj) => obj,
- Err(_) => { return Ok(buffer) },
+ Err(_) => return Ok(buffer),
};
Ok(match reply.as_str() {
Some(s) => s.to_string(),
@@ -125,13 +133,23 @@ fn main() {
let mut opts = Options::new();
opts.optflag("h", "help", "print this help menu");
opts.optflag("", "version", "print the version");
- opts.optopt("e", "execute", "submit this command instead (no shell)", "CMD");
- opts.optopt("d", "socket-path", "where to look for control socket (default: /tmp/einhorn.sock)", "PATH");
+ opts.optopt("e",
+ "execute",
+ "submit this command instead (no shell)",
+ "CMD");
+ opts.optopt("d",
+ "socket-path",
+ "where to look for control socket (default: /tmp/einhorn.sock)",
+ "PATH");
let matches = match opts.parse(&args[1..]) {
- Ok(m) => { m }
- Err(f) => { println!("{}", f.to_string()); print_usage(opts); exit(-1); }
- };
+ Ok(m) => m,
+ Err(f) => {
+ println!("{}", f.to_string());
+ print_usage(opts);
+ exit(-1);
+ }
+ };
if matches.opt_present("help") {
print_usage(opts);
@@ -143,7 +161,7 @@ fn main() {
return;
}
- // Bind to Control Socket
+ // Bind to Control Socket
let path_str = matches.opt_str("socket-path").unwrap_or("/tmp/einhorn.sock".to_string());
let ctrl_path = Path::new(&path_str);
if !ctrl_path.exists() {
@@ -151,30 +169,32 @@ fn main() {
println!("Is the master process running? Do you need to tell me the correct socket path?");
exit(-1);
}
- //println!("Connecting to control socket: {:?}", ctrl_path);
+ // println!("Connecting to control socket: {:?}", ctrl_path);
let ctrl_stream = match UnixStream::connect(ctrl_path) {
Ok(s) => s,
Err(e) => {
println!("Couldn't open socket [{}]: {}", path_str, e);
exit(-1);
- },
+ }
};
// Send a test message before continuing
send_msg(&mut BufReader::new(&ctrl_stream),
&mut BufWriter::new(&ctrl_stream),
"ehlo",
- vec![]).unwrap();
+ vec![])
+ .unwrap();
match matches.opt_str("execute") {
Some(cmd) => {
match send_msg(&mut BufReader::new(&ctrl_stream),
&mut BufWriter::new(&ctrl_stream),
- &cmd, vec![]) {
+ &cmd,
+ vec![]) {
Ok(reply) => println!("{}", reply),
Err(e) => println!("Communications error: {}", e),
}
- },
+ }
None => shell(ctrl_stream),
}
exit(0);
diff --git a/src/main.rs b/src/main.rs
index f88df40..65f1c68 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,23 +1,24 @@
-/*
- * einhyrningsins: graceful restarts for socket-based daemons
- * Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org>
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#[macro_use] extern crate chan;
-#[macro_use] extern crate slog;
+// einhyrningsins: graceful restarts for socket-based daemons
+// Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org>
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#[macro_use]
+extern crate chan;
+#[macro_use]
+extern crate slog;
extern crate slog_syslog;
extern crate slog_term;
extern crate json;
@@ -106,9 +107,9 @@ struct CtrlRequest {
#[derive(Copy, Clone, Debug, PartialEq)]
enum OffspringState {
- Infancy, // just started, waiting for ACK
+ Infancy, // just started, waiting for ACK
Healthy,
- Notified, // shutting down gracefully
+ Notified, // shutting down gracefully
Dead,
}
@@ -122,7 +123,6 @@ struct Offspring {
}
impl Offspring {
-
pub fn spawn(state: &mut EinState) -> Result<Offspring, String> {
let mut o = Offspring {
state: OffspringState::Infancy,
@@ -150,21 +150,23 @@ impl Offspring {
pub fn is_active(&self) -> bool {
match self.state {
- OffspringState::Infancy |
- OffspringState::Healthy |
- OffspringState::Notified => true,
+ OffspringState::Infancy | OffspringState::Healthy | OffspringState::Notified => true,
OffspringState::Dead => false,
}
}
pub fn kill(&mut self) {
- if !self.is_active() { return; }
+ if !self.is_active() {
+ return;
+ }
self.signal(Signal::KILL);
self.state = OffspringState::Dead;
}
pub fn terminate(&mut self, state: &mut EinState) {
- if !self.is_active() { return; }
+ if !self.is_active() {
+ return;
+ }
self.signal(Signal::TERM);
self.state = OffspringState::Notified;
let pid = self.process.id();
@@ -175,12 +177,14 @@ impl Offspring {
}
pub fn shutdown(&mut self, state: &mut EinState) {
- if !self.is_active() { return; }
+ if !self.is_active() {
+ return;
+ }
self.signal(Signal::USR2);
self.state = OffspringState::Notified;
let pid = self.process.id();
let t_tx = state.timer_tx.clone();
- self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod , move || {
+ self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod, move || {
t_tx.send(TimerAction::CheckShutdown(pid));
}));
}
@@ -205,13 +209,13 @@ impl Offspring {
warn!(self.log, "tried to send unexpected signal";
"signal" => format!("{:?}", sig));
return;
- },
+ }
};
nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap();
}
}
-/* * * * * * * * Main Event Loop * * * * * * * */
+// * * * * * * * Main Event Loop * * * * * * *
fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
//// birth the initial set of offspring
@@ -442,7 +446,9 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
},
},
}
- if !run { break; }
+ if !run {
+ break;
+ }
}
info!(state.log, "reaping children";
@@ -455,7 +461,7 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) {
info!(state.log, "done, exiting");
}
-/* * * * * * * * Setup and CLI * * * * * * * */
+// * * * * * * * Setup and CLI * * * * * * *
fn print_usage(opts: Options) {
let brief = "usage:\teinhyrningsins [options] [--] program [program_args]";
@@ -483,9 +489,13 @@ fn main() {
opts.optopt("r", "retries", "how many times to attempt spawning", "RETRIES");
let matches = match opts.parse(&args[1..]) {
- Ok(m) => { m }
- Err(f) => { println!("{}\n", f.to_string()); print_usage(opts); exit(-1); }
- };
+ Ok(m) => m,
+ Err(f) => {
+ println!("{}\n", f.to_string());
+ print_usage(opts);
+ exit(-1);
+ }
+ };
if matches.opt_present("help") {
print_usage(opts);
@@ -519,9 +529,9 @@ fn main() {
let env_drops = matches.opt_strs("drop-env-var");
let ipv4_only = matches.opt_present("4");
let ipv6_only = matches.opt_present("6");
- let manual_ack = matches.opt_present("m");
- let verbose = matches.opt_present("verbose");
- let syslog = matches.opt_present("syslog");
+ let manual_ack = matches.opt_present("m");
+ let verbose = matches.opt_present("verbose");
+ let syslog = matches.opt_present("syslog");
let program_and_args = if !matches.free.is_empty() {
matches.free
@@ -579,7 +589,10 @@ fn main() {
let state = match init(cfg, ctrl_req_rx) {
Ok(s) => s,
- Err(e) => { println!("{}", e); exit(-1); },
+ Err(e) => {
+ println!("{}", e);
+ exit(-1);
+ }
};
//// Start Constrol Socket Thread
@@ -684,7 +697,7 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState,
})
}
-/* * * * * * * * Control Socket Server * * * * * * * */
+// * * * * * * * Control Socket Server * * * * * * *
const CTRL_SHELL_USAGE: &'static str = r#"Command Listing:
@@ -713,9 +726,7 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log:
// Parse message
let req_action = if let Ok(msg) = json::parse(&rawline) {
match msg["command"].as_str() {
- Some("worker:ack") => {
- CtrlAction::ManualAck(msg["pid"].as_u32().unwrap())
- },
+ Some("worker:ack") => CtrlAction::ManualAck(msg["pid"].as_u32().unwrap()),
Some("signal") => {
CtrlAction::SigAll(match msg["args"][0].as_str() {
Some("SIGHUP") | Some("HUP") | Some("hup") => Signal::HUP,
@@ -732,9 +743,9 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log:
writer.write_all(b"\"Missing or unhandled 'signal'\"\n").unwrap();
writer.flush().unwrap();
continue;
- },
+ }
})
- },
+ }
Some("inc") => CtrlAction::Increment,
Some("dec") => CtrlAction::Decrement,
Some("status") => CtrlAction::Status,
@@ -744,25 +755,25 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log:
writer.write_all(b"\"Hi there!\"\n\r").unwrap();
writer.flush().unwrap();
continue;
- },
+ }
Some("help") => {
let escaped = json::stringify(json::JsonValue::from(CTRL_SHELL_USAGE));
writer.write_all(escaped.as_bytes()).unwrap();
writer.write_all(b"\n").unwrap();
writer.flush().unwrap();
continue;
- },
+ }
Some("version") => {
let ver = format!("\"einhyrningsinsctl {}\"\n", env!("CARGO_PKG_VERSION"));
writer.write_all(ver.as_bytes()).unwrap();
writer.flush().unwrap();
continue;
- },
+ }
Some(_) | None => {
writer.write_all(b"\"Missing or unhandled 'command'\"\n").unwrap();
writer.flush().unwrap();
continue;
- },
+ }
}
} else {
writer.write_all(b"\"Expected valid JSON!\"\n").unwrap();
@@ -771,8 +782,11 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log:
};
// Send request
- let (tx, rx): (Sender<String>, Receiver<String>) = chan::async();
- let req = CtrlRequest{ action: req_action, tx: tx };
+ let (tx, rx): (Sender<String>, Receiver<String>) = chan::async();
+ let req = CtrlRequest {
+ action: req_action,
+ tx: tx,
+ };
ctrl_req_tx.send(req);
// Send reply
@@ -787,19 +801,19 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log:
fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender<CtrlRequest>, log: slog::Logger) {
for conn in listener.incoming() {
- match conn{
+ match conn {
Ok(conn) => {
let tx = ctrl_req_tx.clone();
let conn_log = log.new(o!(
"client" => format!("{:?}", conn)));
info!(conn_log, "accepted connection");
thread::spawn(move || ctrl_socket_handle(conn, tx, conn_log));
- },
+ }
Err(err) => {
// TODO
error!(log, "control socket err: {}", err);
break;
- },
+ }
}
}
drop(listener);