diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/einhyrningsinsctl.rs | 102 | ||||
-rw-r--r-- | src/main.rs | 126 |
2 files changed, 131 insertions, 97 deletions
diff --git a/src/bin/einhyrningsinsctl.rs b/src/bin/einhyrningsinsctl.rs index a3ba8f0..527eb61 100644 --- a/src/bin/einhyrningsinsctl.rs +++ b/src/bin/einhyrningsinsctl.rs @@ -1,20 +1,19 @@ -/* - * einhyrningsinsctl: controller/shell for einhyrningsins - * Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org> - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ +// einhyrningsinsctl: controller/shell for einhyrningsins +// Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// #[macro_use] extern crate json; @@ -57,34 +56,43 @@ fn shell(ctrl_stream: UnixStream) { match readline { Ok(line) => { rl.add_history_entry(&line); - if line.is_empty() { continue; }; + if line.is_empty() { + continue; + }; let mut chunks = line.split(' '); let cmd = chunks.nth(0).unwrap(); let args = chunks.collect(); match send_msg(&mut reader, &mut writer, cmd, args) { - Ok(s) => { println!("{}", s); }, + Ok(s) => { + println!("{}", s); + } Err(e) => { println!("Error sending control message: {}", e); exit(-1); - }, + } } - }, - Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => { + } + Err(ReadlineError::Interrupted) | + Err(ReadlineError::Eof) => { println!("Caught kill signal (shutting down)"); - break - }, + break; + } Err(err) => { println!("Shell Error: {:?} (shutting down)", err); - break + break; } } } - //drop(ctrl_stream); + // drop(ctrl_stream); } // This function sends a single request message down the writer, then waits for a reply on the // reader and prints the result. -fn send_msg(reader: &mut BufRead, writer: &mut Write, cmd: &str, args: Vec<&str>) -> io::Result<String> { +fn send_msg(reader: &mut BufRead, + writer: &mut Write, + cmd: &str, + args: Vec<&str>) + -> io::Result<String> { let mut buffer = String::new(); let mut arg_list = json::JsonValue::new_array(); @@ -96,15 +104,15 @@ fn send_msg(reader: &mut BufRead, writer: &mut Write, cmd: &str, args: Vec<&str> "command" => cmd, "args" => arg_list }; - //println!("Sending: {}", req.dump()); + // println!("Sending: {}", req.dump()); try!(writer.write_all(format!("{}\n", req.dump()).as_bytes())); try!(writer.flush()); try!(reader.read_line(&mut buffer)); - //println!("Got: {}", buffer); + // println!("Got: {}", buffer); let reply = match json::parse(&buffer) { Ok(obj) => obj, - Err(_) => { return Ok(buffer) }, + Err(_) => return Ok(buffer), }; Ok(match reply.as_str() { Some(s) => s.to_string(), @@ -125,13 +133,23 @@ fn main() { let mut opts = Options::new(); opts.optflag("h", "help", "print this help menu"); opts.optflag("", "version", "print the version"); - opts.optopt("e", "execute", "submit this command instead (no shell)", "CMD"); - opts.optopt("d", "socket-path", "where to look for control socket (default: /tmp/einhorn.sock)", "PATH"); + opts.optopt("e", + "execute", + "submit this command instead (no shell)", + "CMD"); + opts.optopt("d", + "socket-path", + "where to look for control socket (default: /tmp/einhorn.sock)", + "PATH"); let matches = match opts.parse(&args[1..]) { - Ok(m) => { m } - Err(f) => { println!("{}", f.to_string()); print_usage(opts); exit(-1); } - }; + Ok(m) => m, + Err(f) => { + println!("{}", f.to_string()); + print_usage(opts); + exit(-1); + } + }; if matches.opt_present("help") { print_usage(opts); @@ -143,7 +161,7 @@ fn main() { return; } - // Bind to Control Socket + // Bind to Control Socket let path_str = matches.opt_str("socket-path").unwrap_or("/tmp/einhorn.sock".to_string()); let ctrl_path = Path::new(&path_str); if !ctrl_path.exists() { @@ -151,30 +169,32 @@ fn main() { println!("Is the master process running? Do you need to tell me the correct socket path?"); exit(-1); } - //println!("Connecting to control socket: {:?}", ctrl_path); + // println!("Connecting to control socket: {:?}", ctrl_path); let ctrl_stream = match UnixStream::connect(ctrl_path) { Ok(s) => s, Err(e) => { println!("Couldn't open socket [{}]: {}", path_str, e); exit(-1); - }, + } }; // Send a test message before continuing send_msg(&mut BufReader::new(&ctrl_stream), &mut BufWriter::new(&ctrl_stream), "ehlo", - vec![]).unwrap(); + vec![]) + .unwrap(); match matches.opt_str("execute") { Some(cmd) => { match send_msg(&mut BufReader::new(&ctrl_stream), &mut BufWriter::new(&ctrl_stream), - &cmd, vec![]) { + &cmd, + vec![]) { Ok(reply) => println!("{}", reply), Err(e) => println!("Communications error: {}", e), } - }, + } None => shell(ctrl_stream), } exit(0); diff --git a/src/main.rs b/src/main.rs index f88df40..65f1c68 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,24 @@ -/* - * einhyrningsins: graceful restarts for socket-based daemons - * Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org> - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#[macro_use] extern crate chan; -#[macro_use] extern crate slog; +// einhyrningsins: graceful restarts for socket-based daemons +// Copyright (C) 2016 Bryan Newbold <bnewbold@robocracy.org> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#[macro_use] +extern crate chan; +#[macro_use] +extern crate slog; extern crate slog_syslog; extern crate slog_term; extern crate json; @@ -106,9 +107,9 @@ struct CtrlRequest { #[derive(Copy, Clone, Debug, PartialEq)] enum OffspringState { - Infancy, // just started, waiting for ACK + Infancy, // just started, waiting for ACK Healthy, - Notified, // shutting down gracefully + Notified, // shutting down gracefully Dead, } @@ -122,7 +123,6 @@ struct Offspring { } impl Offspring { - pub fn spawn(state: &mut EinState) -> Result<Offspring, String> { let mut o = Offspring { state: OffspringState::Infancy, @@ -150,21 +150,23 @@ impl Offspring { pub fn is_active(&self) -> bool { match self.state { - OffspringState::Infancy | - OffspringState::Healthy | - OffspringState::Notified => true, + OffspringState::Infancy | OffspringState::Healthy | OffspringState::Notified => true, OffspringState::Dead => false, } } pub fn kill(&mut self) { - if !self.is_active() { return; } + if !self.is_active() { + return; + } self.signal(Signal::KILL); self.state = OffspringState::Dead; } pub fn terminate(&mut self, state: &mut EinState) { - if !self.is_active() { return; } + if !self.is_active() { + return; + } self.signal(Signal::TERM); self.state = OffspringState::Notified; let pid = self.process.id(); @@ -175,12 +177,14 @@ impl Offspring { } pub fn shutdown(&mut self, state: &mut EinState) { - if !self.is_active() { return; } + if !self.is_active() { + return; + } self.signal(Signal::USR2); self.state = OffspringState::Notified; let pid = self.process.id(); let t_tx = state.timer_tx.clone(); - self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod , move || { + self.timer_guard = Some(state.timer.schedule_with_delay(state.cfg.graceperiod, move || { t_tx.send(TimerAction::CheckShutdown(pid)); })); } @@ -205,13 +209,13 @@ impl Offspring { warn!(self.log, "tried to send unexpected signal"; "signal" => format!("{:?}", sig)); return; - }, + } }; nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap(); } } -/* * * * * * * * Main Event Loop * * * * * * * */ +// * * * * * * * Main Event Loop * * * * * * * fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { //// birth the initial set of offspring @@ -442,7 +446,9 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { }, }, } - if !run { break; } + if !run { + break; + } } info!(state.log, "reaping children"; @@ -455,7 +461,7 @@ fn shepard(mut state: EinState, signal_rx: Receiver<Signal>) { info!(state.log, "done, exiting"); } -/* * * * * * * * Setup and CLI * * * * * * * */ +// * * * * * * * Setup and CLI * * * * * * * fn print_usage(opts: Options) { let brief = "usage:\teinhyrningsins [options] [--] program [program_args]"; @@ -483,9 +489,13 @@ fn main() { opts.optopt("r", "retries", "how many times to attempt spawning", "RETRIES"); let matches = match opts.parse(&args[1..]) { - Ok(m) => { m } - Err(f) => { println!("{}\n", f.to_string()); print_usage(opts); exit(-1); } - }; + Ok(m) => m, + Err(f) => { + println!("{}\n", f.to_string()); + print_usage(opts); + exit(-1); + } + }; if matches.opt_present("help") { print_usage(opts); @@ -519,9 +529,9 @@ fn main() { let env_drops = matches.opt_strs("drop-env-var"); let ipv4_only = matches.opt_present("4"); let ipv6_only = matches.opt_present("6"); - let manual_ack = matches.opt_present("m"); - let verbose = matches.opt_present("verbose"); - let syslog = matches.opt_present("syslog"); + let manual_ack = matches.opt_present("m"); + let verbose = matches.opt_present("verbose"); + let syslog = matches.opt_present("syslog"); let program_and_args = if !matches.free.is_empty() { matches.free @@ -579,7 +589,10 @@ fn main() { let state = match init(cfg, ctrl_req_rx) { Ok(s) => s, - Err(e) => { println!("{}", e); exit(-1); }, + Err(e) => { + println!("{}", e); + exit(-1); + } }; //// Start Constrol Socket Thread @@ -684,7 +697,7 @@ fn init(cfg: EinConfig, ctrl_req_rx: Receiver<CtrlRequest>) -> Result<EinState, }) } -/* * * * * * * * Control Socket Server * * * * * * * */ +// * * * * * * * Control Socket Server * * * * * * * const CTRL_SHELL_USAGE: &'static str = r#"Command Listing: @@ -713,9 +726,7 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log: // Parse message let req_action = if let Ok(msg) = json::parse(&rawline) { match msg["command"].as_str() { - Some("worker:ack") => { - CtrlAction::ManualAck(msg["pid"].as_u32().unwrap()) - }, + Some("worker:ack") => CtrlAction::ManualAck(msg["pid"].as_u32().unwrap()), Some("signal") => { CtrlAction::SigAll(match msg["args"][0].as_str() { Some("SIGHUP") | Some("HUP") | Some("hup") => Signal::HUP, @@ -732,9 +743,9 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log: writer.write_all(b"\"Missing or unhandled 'signal'\"\n").unwrap(); writer.flush().unwrap(); continue; - }, + } }) - }, + } Some("inc") => CtrlAction::Increment, Some("dec") => CtrlAction::Decrement, Some("status") => CtrlAction::Status, @@ -744,25 +755,25 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log: writer.write_all(b"\"Hi there!\"\n\r").unwrap(); writer.flush().unwrap(); continue; - }, + } Some("help") => { let escaped = json::stringify(json::JsonValue::from(CTRL_SHELL_USAGE)); writer.write_all(escaped.as_bytes()).unwrap(); writer.write_all(b"\n").unwrap(); writer.flush().unwrap(); continue; - }, + } Some("version") => { let ver = format!("\"einhyrningsinsctl {}\"\n", env!("CARGO_PKG_VERSION")); writer.write_all(ver.as_bytes()).unwrap(); writer.flush().unwrap(); continue; - }, + } Some(_) | None => { writer.write_all(b"\"Missing or unhandled 'command'\"\n").unwrap(); writer.flush().unwrap(); continue; - }, + } } } else { writer.write_all(b"\"Expected valid JSON!\"\n").unwrap(); @@ -771,8 +782,11 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log: }; // Send request - let (tx, rx): (Sender<String>, Receiver<String>) = chan::async(); - let req = CtrlRequest{ action: req_action, tx: tx }; + let (tx, rx): (Sender<String>, Receiver<String>) = chan::async(); + let req = CtrlRequest { + action: req_action, + tx: tx, + }; ctrl_req_tx.send(req); // Send reply @@ -787,19 +801,19 @@ fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender<CtrlRequest>, log: fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender<CtrlRequest>, log: slog::Logger) { for conn in listener.incoming() { - match conn{ + match conn { Ok(conn) => { let tx = ctrl_req_tx.clone(); let conn_log = log.new(o!( "client" => format!("{:?}", conn))); info!(conn_log, "accepted connection"); thread::spawn(move || ctrl_socket_handle(conn, tx, conn_log)); - }, + } Err(err) => { // TODO error!(log, "control socket err: {}", err); break; - }, + } } } drop(listener); |