From a0e2f837f7ffc33b125f0036d01f546a1f8ac9d6 Mon Sep 17 00:00:00 2001 From: bnewbold Date: Tue, 11 Oct 2016 21:51:32 -0700 Subject: work in progress on control socket --- Cargo.lock | 86 +++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 6 ++-- doc/refs.txt | 3 ++ src/main.rs | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 197 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f85d35f..776a265 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,8 +7,11 @@ dependencies = [ "chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", + "json 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slog-envlogger 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "timer 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -78,6 +81,11 @@ dependencies = [ "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "env_logger" version = "0.3.5" @@ -92,6 +100,21 @@ name = "getopts" version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "isatty" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "json" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -106,6 +129,11 @@ name = "lazy_static" version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "lazy_static" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "libc" version = "0.2.16" @@ -207,6 +235,55 @@ name = "semver" version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "slog" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "slog-envlogger" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 0.1.77 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slog-stdlog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slog-term 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "slog-stdlog" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slog-term 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "slog-stream" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "slog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "slog-term" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)", + "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slog-stream 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "thread-id" version = "2.0.0" @@ -272,10 +349,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum chan-signal 0.1.6 (git+https://github.com/bnewbold/chan-signal)" = "" "checksum chan-signal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "afbba6202dc1d10ff08c3b04e00e4d2d6cf5effee56cd9fee92928be6692379a" "checksum chrono 0.2.25 (registry+https://github.com/rust-lang/crates.io-index)" = "9213f7cd7c27e95c2b57c49f0e69b1ea65b27138da84a170133fd21b07659c00" +"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9047cfbd08a437050b363d35ef160452c5fe8ea5187ae0a624708c91581d685" +"checksum isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7408a548dc0e406b7912d9f84c261cc533c1866e047644a811c133c56041ac0c" +"checksum json 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "772d39b38286c6ebeb2c0412a8d03afd81c3c3ba0046571b9ce9c2ef64475698" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "cf186d1a8aa5f5bee5fd662bc9c1b949e0259e1bcc379d1f006847b0080c7417" +"checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f" "checksum libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)" = "408014cace30ee0f767b1c4517980646a573ec61a57957aeeabcac8ac0a02e8d" "checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" @@ -289,6 +370,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum regex-syntax 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "48f0573bcee95a48da786f8823465b5f2a1fae288a55407aca991e5b3e0eae11" "checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084" "checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" +"checksum slog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d816659be2527f0f85437f31ebd3bea98a2553f83c41a6404abae9f530c9ab62" +"checksum slog-envlogger 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "dfea715bb310c33c8f90e659bce5b95e39851348b9a7e2a77495a069662def78" +"checksum slog-stdlog 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9c50950e13491ed54f1417bdfdf572f38d9b6d597c48d865ac155fa49fe5357f" +"checksum slog-stream 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "33e181e37e13b935f2f2f6a01edf143ebd3520915b46528678024bf34df3b922" +"checksum slog-term 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "04d9d55d8d57a200c9b676f8113a8e185d01fffa891961fa7f2804711423f27e" "checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" "checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" "checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af" diff --git a/Cargo.toml b/Cargo.toml index fec9e07..ca068a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,9 +13,9 @@ time = "0.1" chrono = "0.2" chan = "0.1" chan-signal = "0.1" -#slog = "1.0" -#slog-envlogger = "0.5" -#jsonrpc-core = "3.0" +slog = "1.0" +slog-envlogger = "0.5" +json = "*" [replace] "chan-signal:0.1.6" = { git = 'https://github.com/bnewbold/chan-signal' } diff --git a/doc/refs.txt b/doc/refs.txt index aaeff81..42e9512 100644 --- a/doc/refs.txt +++ b/doc/refs.txt @@ -8,3 +8,6 @@ http://www.vidarholen.net/contents/blog/?p=34 "Should you be scared of Unix signals?" http://jvns.ca/blog/2016/06/13/should-you-be-scared-of-signals/ + +"Reap zombie processes using a SIGCHLD handler" +http://www.microhowto.info/howto/reap_zombie_processes_using_a_sigchld_handler.html diff --git a/src/main.rs b/src/main.rs index 4e994c3..6a96af1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,8 @@ extern crate timer; extern crate time; extern crate chan_signal; +use std::io::prelude::*; +use std::io::{BufReader, BufWriter}; use std::env; use std::u64; use std::str::FromStr; @@ -36,7 +38,8 @@ use std::process::Child; use std::net::SocketAddr; use std::net::TcpListener; use std::net::ToSocketAddrs; -//use std::time::Instant; +use std::os::unix::net::{UnixStream, UnixListener}; +use std::thread; use time::Duration; use std::collections::HashMap; use getopts::Options; @@ -45,6 +48,7 @@ use chan_signal::Signal; use chan::{Sender, Receiver}; use std::os::unix::io::{RawFd, IntoRawFd}; +// TODO: split this into read-only config and mutable state struct EinConfig { childhood: Duration, graceperiod: Duration, @@ -55,6 +59,7 @@ struct EinConfig { cmd: Command, ipv4_only: bool, ipv6_only: bool, + ctrl_req_rx: Receiver, } #[derive(Copy, Clone, Debug, PartialEq)] @@ -68,7 +73,6 @@ enum OffspringState { struct Offspring { state: OffspringState, process: Child, - // birthday: Instant, // specifies the generation attempts: u64, timer_guard: Option, replaces: Option, @@ -80,7 +84,6 @@ impl Offspring { let mut o = Offspring { state: OffspringState::Infancy, process: cfg.cmd.spawn().expect("error spawning"), - // birthday: Instant::now(), attempts: 0, timer_guard: None, replaces: None, @@ -138,10 +141,11 @@ impl Offspring { return; } let nix_sig = match sig { - Signal::HUP => nix::sys::signal::Signal::SIGHUP, - Signal::INT => nix::sys::signal::Signal::SIGINT, - Signal::TERM => nix::sys::signal::Signal::SIGTERM, + Signal::HUP => nix::sys::signal::Signal::SIGHUP, + Signal::INT => nix::sys::signal::Signal::SIGINT, + Signal::TERM => nix::sys::signal::Signal::SIGTERM, Signal::KILL => nix::sys::signal::Signal::SIGKILL, + Signal::USR2 => nix::sys::signal::Signal::SIGUSR2, _ => { println!("Unexpected signal: {:?}", sig); return; }, }; nix::sys::signal::kill(self.process.id() as i32, nix_sig).unwrap(); @@ -155,6 +159,22 @@ enum TimerAction { CheckShutdown(u32), } +#[derive(Copy, Clone, Debug, PartialEq)] +enum CtrlAction { + Increment, + Decrement, + SigAll(Signal), + ShutdownAll, + UpgradeAll, + Status, +} + +#[derive(Clone, Debug, PartialEq)] +struct CtrlRequest { + action: CtrlAction, + tx: Sender, +} + // This is the main event loop fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { @@ -171,6 +191,9 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { println!("Spawned: {}", pid); } + // Ugh, see: http://burntsushi.net/rustdoc/chan/macro.chan_select.html#failure-modes + let ctrl_req_rx = cfg.ctrl_req_rx.clone(); + //// infinite select() loop over timers, signals let mut run = true; loop { @@ -221,6 +244,38 @@ fn shepard(mut cfg: EinConfig, signal_rx: Receiver) { } }, }, + ctrl_req_rx.recv() -> maybe_req => + if let Some(req) = maybe_req { match req.action { + CtrlAction::Increment => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::Decrement => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::SigAll(sig) => { + for (_, o) in brood.iter_mut() { + o.signal(sig); + } + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::ShutdownAll => { + let mut pid_list = vec![]; + for (pid, o) in brood.iter_mut() { + if o.is_active() { + o.shutdown(&mut cfg, &mut timer, timer_tx.clone()); + pid_list.push(pid); + } + } + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::UpgradeAll => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + CtrlAction::Status => { + req.tx.send(format!("UNIMPLEMENTED")); + }, + } + }, signal_rx.recv() -> sig => match sig.expect("Error with signal handler") { Signal::CHLD => { loop { @@ -326,6 +381,7 @@ fn main() { } //// Parse Configuration + let (ctrl_req_tx, ctrl_req_rx): (Sender, Receiver) = chan::async(); let mut cfg = EinConfig{ count: 1, childhood: Duration::seconds(3), @@ -336,6 +392,7 @@ fn main() { ipv6_only: matches.opt_present("6"), manual_ack: matches.opt_present("m"), cmd: Command::new(""), + ctrl_req_rx: ctrl_req_rx, }; if let Some(n) = matches.opt_str("number") { @@ -343,6 +400,11 @@ fn main() { } //// Bind Sockets + + // Control socket first + let ctrl_listener = UnixListener::bind("/tmp/einhorn.sock").unwrap(); + // XXX: set mode/permissions/owner? + // These will be tuples: (SocketAddr, SO_REUSEADDR, O_NONBLOCK) let sock_confs: Vec<(SocketAddr, bool, bool)> = matches.opt_strs("bind").iter().map(|b| { let mut r = false; @@ -418,12 +480,48 @@ fn main() { //// Listen for signals (before any fork()) println!("Registering signal handlers..."); + // TODO: Should mask others here? START, etc? let signal_rx = chan_signal::notify(&[Signal::INT, Signal::TERM, - Signal::CHLD, // XXX: PR has been submitted + Signal::CHLD, // NB: PR has been submitted Signal::USR2, Signal::HUP]); + //// Start Constrol Socket Thread + thread::spawn(move || ctrl_socket_serve(ctrl_listener, ctrl_req_tx)); + shepard(cfg, signal_rx); exit(0); } + +fn ctrl_socket_handle(stream: UnixStream, ctrl_req_tx: Sender) { + let reader = BufReader::new(&stream); + let mut writer = BufWriter::new(&stream); + for line in reader.lines() { + let line = line.unwrap(); + println!("Got message: {}", line); + let (tx, rx): (Sender, Receiver) = chan::async(); + let req = CtrlRequest{ action: CtrlAction::Status, tx: tx }; + ctrl_req_tx.send(req); + let resp = rx.recv().unwrap(); + writer.write_all(resp.as_bytes()).unwrap(); + } + stream.shutdown(std::net::Shutdown::Both).unwrap(); +} + +fn ctrl_socket_serve(listener: UnixListener, ctrl_req_tx: Sender) { + for conn in listener.incoming() { + match conn{ + Ok(conn) => { + let tx = ctrl_req_tx.clone(); + thread::spawn(move || ctrl_socket_handle(conn, tx)); + }, + Err(err) => { + // TODO + println!("control socket err: {}", err); + break; + }, + } + } + drop(listener); +} -- cgit v1.2.3