use serde::Deserialize; use hyper::{Request, Body, Method, Uri}; pub mod parse; use parse::UrlQueryParams; #[derive(Default, Deserialize, Debug, Clone)] pub struct ProxyConfig { pub bind_addr: Option, // 127.0.0.1:9292 pub upstream_addr: Option, // 127.0.0.1:9200 pub allow_all_indices: Option, pub index: Vec } #[derive(Deserialize, Debug, Clone)] pub struct IndexConfig { pub name: String, } impl ProxyConfig { pub fn allow_index(&self, name: &str) -> bool { if self.allow_all_indices == Some(true) { return true } for index in &self.index { if index.name == name { return true } } false } } #[derive(Debug)] pub enum ProxyError { Malformed(String), ParseError(String), NotAllowed(String), NotSupported(String), NotFound(String), } pub async fn filter_request(req: Request, config: &ProxyConfig) -> Result, ProxyError> { let (parts, body) = req.into_parts(); // split path into at most 3 chunks let mut req_path = parts.uri.path(); if req_path.starts_with("/") { req_path = &req_path[1..]; } let path_chunks: Vec<&str> = req_path.split("/").collect(); if path_chunks.len() > 3 { return Err(ProxyError::NotSupported("only request paths with up to three segments allowed".to_string())) } let params = parse_params(parts.uri.query())?; // this is sort of like a router let body = match (&parts.method, path_chunks.as_slice()) { (&Method::GET, [""]) | (&Method::HEAD, [""]) => { Body::empty() }, (&Method::POST, ["_search", "scroll"]) | (&Method::DELETE, ["_search", "scroll"]) => { let whole_body = hyper::body::to_bytes(body).await.unwrap(); filter_scroll_request(¶ms, &whole_body, config)? }, (&Method::GET, [index, "_search"]) | (&Method::POST, [index, "_search"]) => { let whole_body = hyper::body::to_bytes(body).await.unwrap(); filter_search_request(index, ¶ms, &whole_body, config)? }, (&Method::GET, [index, "_count"]) | (&Method::POST, [index, "_count"]) => { let whole_body = hyper::body::to_bytes(body).await.unwrap(); filter_search_request(index, ¶ms, &whole_body, config)? }, (&Method::GET, [index, "_doc", key]) | (&Method::GET, [index, "_source", key]) => { filter_read_request(index, path_chunks[1], key, ¶ms, config)? }, _ => Err(ProxyError::NotSupported("unknown endpoint".to_string()))?, }; let upstream_query = serialize_params(¶ms); let upstream_query_and_params = if upstream_query.len() > 0 { format!("{}?{}", req_path, upstream_query) } else { req_path.to_string() }; let upstream_uri = Uri::builder() .scheme("http") .authority(config.upstream_addr.as_ref().unwrap_or(&"localhost:9200".to_string()).as_str()) .path_and_query(upstream_query_and_params.as_str()) .build() .unwrap(); let upstream_req = Request::builder() .uri(upstream_uri) .method(&parts.method) .body(body) .unwrap(); Ok(upstream_req) } pub fn filter_scroll_request(_params: &UrlQueryParams, _body: &[u8], _config: &ProxyConfig) -> Result { // XXX // TODO: check that scroll_id is not "_all" //let _parsed: ScrollBody = serde_json::from_str(&body).unwrap(); Err(ProxyError::NotSupported("not yet implemented".to_string())) } pub fn filter_read_request(index: &str, _endpoint: &str, _key: &str, _params: &UrlQueryParams, config: &ProxyConfig) -> Result{ if !config.allow_index(index) { return Err(ProxyError::NotAllowed(format!("index doesn't exist or isn't proxied: {}", index))); } // XXX: no body needed? Ok(Body::empty()) } pub fn filter_search_request(index: &str, _params: &UrlQueryParams, body: &[u8], config: &ProxyConfig) -> Result { if !config.allow_index(index) { return Err(ProxyError::NotAllowed(format!("index doesn't exist or isn't proxied: {}", index))); } // XXX: more checks if body.len() > 0 { let parsed: parse::ScrollBody = serde_json::from_slice(body).unwrap(); Ok(Body::from(serde_json::to_string(&parsed).unwrap())) } else { Ok(Body::empty()) } } pub fn parse_params(query: Option<&str>) -> Result { println!("params: {:?}", query); let raw_params: serde_json::map::Map = query .map(|q| { url::form_urlencoded::parse(q.as_bytes()) .into_owned() .map(|(k,v)| (k, serde_json::from_str(&v).unwrap())) .collect() }) .unwrap_or_else(serde_json::map::Map::new); let parsed: UrlQueryParams = serde_json::from_value(serde_json::Value::Object(raw_params)).unwrap(); Ok(parsed) } pub fn serialize_params(params: &UrlQueryParams) -> String { let json_value = serde_json::to_value(params).unwrap(); let value_map: serde_json::map::Map = match json_value { serde_json::Value::Object(val) => val, _ => panic!("expected an object"), }; let mut builder = url::form_urlencoded::Serializer::new(String::new()); // XXX: array and object types should raise an error? for (k, v) in value_map.iter() { match v { serde_json::Value::Null | serde_json::Value::Object(_) | serde_json::Value::Array(_) => (), serde_json::Value::Bool(_) | serde_json::Value::Number(_) | serde_json::Value::String(_) => { let string_val = serde_json::to_string(&v).unwrap(); builder.append_pair(k, &string_val); } } } builder.finish() }