From a0e2f837f7ffc33b125f0036d01f546a1f8ac9d6 Mon Sep 17 00:00:00 2001 From: bnewbold Date: Tue, 11 Oct 2016 21:51:32 -0700 Subject: work in progress on control socket --- src/main.rs | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 105 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 4e994c3..6a96af1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,8 @@ extern crate timer; extern crate time; extern crate chan_signal; +use std::io::prelude::*; +use std::io::{BufReader, BufWriter}; use std::env; use std::u64; use std::str::FromStr; @@ -36,7 +38,8 @@ use std::process::Child; use std::net::SocketAddr; use std::net::TcpListener; use std::net::ToSocketAddrs; -//use std::time::Instant; +use std::os::unix::net::{UnixStream, UnixListener}; +use std::thread; use time::Duration; use std::collections::HashMap; use getopts::Options; @@ -45,6 +48,7 @@ use chan_signal::Signal; use chan::{Sender, Receiver}; use std::os::unix::io::{RawFd, IntoRawFd}; +// TODO: split this into read-only config and mutable state struct EinConfig { childhood: Duration, graceperiod: Duration, @@ -55,6 +59,7 @@ struct EinConfig { cmd: Command, ipv4_only: bool, ipv6_only: bool, + ctrl_req_rx: Receiver, } #[derive(Copy, Clone, Debug, PartialEq)] @@ -68,7 +73,6 @@ enum OffspringState { struct Offspring { state: OffspringState, process: Child, - // birthday: Instant, // specifies the generation attempts: u64, timer_guard: Option, replaces: Option, @@ -80,7 +84,6 @@ impl Offspring { let mut o = Offspring { state: OffspringState::Infancy, process: cfg.cmd.spawn().expect("error spawning"), - // birthday: Instant::now(), attempts: 0, timer_guard: None, replaces: None, @@ -138,10 +141,11 @@ impl Offspring { return; } let nix_sig = match sig { - Signal::HUP => nix::sys::signal::Signal::SIGHUP, - Signal::INT => nix::sys::signal::Signal::SIGINT, - Signal::TERM => nix::sys::signal::Signal::SIGTERM, + Signal::HUP => nix::sys::signal::Signal::SIGHUP, + Signal::INT => nix::sys::signal::Signal::SIGINT, + Signal::TERM => nix::sys::signal::Signal::SIGTERM, Signal::KILL => nix::sys::signal::Signal::SIGKILL, + Signal::USR2 => nix::sys::signal::Signal::SIGUSR2, _ => { println!("Unexpected signal: {:?}", sig); return; }, }; nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap(); @@ -155,6 +159,22 @@ enum TimerAction { CheckShutdown(u32), } +#[derive(Copy, Clone, Debug, PartialEq)] +enum CtrlAction { + Increment, + Decrement, + SigAll(Signal), + ShutdownAll, + UpgradeAll, + Status, +} + +#[derive(Clone, Debug, PartialEq)] +struct CtrlRequest { + action: CtrlAction, + tx: Sender, +} + // This is the main event loop fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { @@ -171,6 +191,9 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { println!("Spawned: {}", pid); } + // Ugh, see: http://burntsushi.net/rustdoc/chan/macro.chan_select.html#failure-modes + let ctrl_req_rx = cfg.ctrl_req_rx.clone(); + //// infinite select() loop over timers, signals let mut run = true; loop { @@ -221,6 +244,38 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { } }, }, + ctrl_req_rx.recv() -> maybe_req => + if let Some(req) = maybe_req { match req.action { + CtrlAction::Increment => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::Decrement => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::SigAll(sig) => { + for (_, o) in brood.iter_mut() { + o.signal(sig); + } + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::ShutdownAll => { + let mut pid_list = vec![]; + for (pid, o) in brood.iter_mut() { + if o.is_active() { + o.shutdown(&mut cfg, &mut timer, timer_tx.clone()); + pid_list.push(pid); + } + } + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::UpgradeAll => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::Status => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + } + }, signal_rx.recv() -> sig => match sig.expect("Error with signal handler") { Signal::CHLD => { loop { @@ -326,6 +381,7 @@ fn main() { } //// Parse Configuration + let (ctrl_req_tx, ctrl_req_rx): (Sender, Receiver) = chan::async(); let mut cfg = EinConfig{ count: 1, childhood: Duration::seconds(3), @@ -336,6 +392,7 @@ fn main() { ipv6_only: matches.opt_present("6"), manual_ack: matches.opt_present("m"), cmd: Command::new(""), + ctrl_req_rx: ctrl_req_rx, }; if let Some(n) = matches.opt_str("number") { @@ -343,6 +400,11 @@ fn main() { } //// Bind Sockets + + // Control socket first + let ctrl_listener = UnixListener::bind("/tmp/einhorn.sock").unwrap(); + // XXX: set mode/permissions/owner? + // These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK) let sock_confs: Vec<(SocketAddr, bool, bool)> = matches.opt_strs("bind").iter().map(|b| { let mut r = false; @@ -418,12 +480,48 @@ fn main() { //// Listen for signals (before any fork()) println!("Registering signal handlers..."); + // TODO: Should mask others here? START, etc? let signal_rx = chan_signal::notify(&[Signal::INT, Signal::TERM, - Signal::CHLD, // XXX: PR has been submitted + Signal::CHLD, // NB: PR has been submitted Signal::USR2, Signal::HUP]); + //// Start Constrol Socket Thread + thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx)); + shepard(cfg, signal_rx); exit(0); } + +fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender) { + let reader = BufReader::new(&stream); + let mut writer = BufWriter::new(&stream); + for line in reader.lines() { + let line = line.unwrap(); + println!("Got message: {}", line); + let (tx, rx): (Sender, Receiver) = chan::async(); + let req = CtrlRequest{ action: CtrlAction::Status, tx: tx }; + ctrl_req_tx.send(req); + let resp = rx.recv().unwrap(); + writer.write_all(resp.as_bytes()).unwrap(); + } + stream.shutdown(std::net::Shutdown::Both).unwrap(); +} + +fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender) { + for conn in listener.incoming() { + match conn{ + Ok(conn) => { + let tx = ctrl_req_tx.clone(); + thread::spawn(move || ctrl_socket_handle(conn, tx)); + }, + Err(err) => { + // TODO + println!("control socket err: {}", err); + break; + }, + } + } + drop(listener); +} -- cgit v1.2.3