diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 49 | ||||
-rw-r--r-- | src/main.rs | 5 |
2 files changed, 41 insertions, 13 deletions
@@ -1,6 +1,7 @@ use serde::Deserialize; use hyper::{Request, Body, Method, Uri}; +use serde_json::json; pub mod parse; @@ -43,6 +44,19 @@ pub enum ProxyError { NotFound(String), } +impl ProxyError { + + pub fn to_json(&self) -> serde_json::Value { + json!({ + "error": { + "reason": format!("{:?}", self), + "type": "unknown", + }, + "status": 500, + }) + } +} + pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result<Request<Body>, ProxyError> { let (parts, body) = req.into_parts(); @@ -56,7 +70,8 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result< return Err(ProxyError::NotSupported("only request paths with up to three segments allowed".to_string())) } - let params: UrlQueryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or("")).unwrap(); + let params: UrlQueryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or("")) + .map_err(|e| ProxyError::ParseError(e.to_string()))?; // this is sort of like a router let body = match (&parts.method, path_chunks.as_slice()) { @@ -64,15 +79,21 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result< Body::empty() }, (&Method::POST, ["_search", "scroll"]) | (&Method::DELETE, ["_search", "scroll"]) => { - let whole_body = hyper::body::to_bytes(body).await.unwrap(); + let whole_body = hyper::body::to_bytes(body) + .await + .map_err(|e| ProxyError::Malformed(e.to_string()))?; filter_scroll_request(¶ms, &whole_body, config)? }, (&Method::GET, [index, "_search"]) | (&Method::POST, [index, "_search"]) => { - let whole_body = hyper::body::to_bytes(body).await.unwrap(); + let whole_body = hyper::body::to_bytes(body) + .await + .map_err(|e| ProxyError::Malformed(e.to_string()))?; filter_search_request(index, ¶ms, &whole_body, config)? }, (&Method::GET, [index, "_count"]) | (&Method::POST, [index, "_count"]) => { - let whole_body = hyper::body::to_bytes(body).await.unwrap(); + let whole_body = hyper::body::to_bytes(body) + .await + .map_err(|e| ProxyError::Malformed(e.to_string()))?; filter_search_request(index, ¶ms, &whole_body, config)? }, (&Method::GET, [index, "_doc", key]) | (&Method::GET, [index, "_source", key]) => { @@ -81,7 +102,7 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result< _ => Err(ProxyError::NotSupported("unknown endpoint".to_string()))?, }; - let upstream_query = serde_urlencoded::to_string(params).unwrap(); + let upstream_query = serde_urlencoded::to_string(params).expect("re-encoding URL parameters"); let upstream_query_and_params = if upstream_query.len() > 0 { format!("{}?{}", req_path, upstream_query) } else { @@ -92,21 +113,26 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result< .authority(config.upstream_addr.as_ref().unwrap_or(&"localhost:9200".to_string()).as_str()) .path_and_query(upstream_query_and_params.as_str()) .build() - .unwrap(); + .expect("constructing upstream request URI"); let upstream_req = Request::builder() .uri(upstream_uri) .method(&parts.method) .body(body) - .unwrap(); + .expect("constructing upstream request"); Ok(upstream_req) } -pub fn filter_scroll_request(_params: &UrlQueryParams, _body: &[u8], _config: &ProxyConfig) -> Result<Body, ProxyError> { +pub fn filter_scroll_request(_params: &UrlQueryParams, body: &[u8], _config: &ProxyConfig) -> Result<Body, ProxyError> { // XXX // TODO: check that scroll_id is not "_all" - //let _parsed: ScrollBody = serde_json::from_str(&body).unwrap(); - Err(ProxyError::NotSupported("not yet implemented".to_string())) + if body.len() > 0 { + let parsed: parse::ScrollBody = serde_json::from_slice(body) + .map_err(|e| ProxyError::ParseError(e.to_string()))?; + Ok(Body::from(serde_json::to_string(&parsed).unwrap())) + } else { + Ok(Body::empty()) + } } pub fn filter_read_request(index: &str, _endpoint: &str, _key: &str, _params: &UrlQueryParams, config: &ProxyConfig) -> Result<Body, ProxyError>{ @@ -123,7 +149,8 @@ pub fn filter_search_request(index: &str, _params: &UrlQueryParams, body: &[u8], } // XXX: more checks if body.len() > 0 { - let parsed: parse::ScrollBody = serde_json::from_slice(body).unwrap(); + let parsed: parse::SearchBody = serde_json::from_slice(body) + .map_err(|e| ProxyError::ParseError(e.to_string()))?; Ok(Body::from(serde_json::to_string(&parsed).unwrap())) } else { Ok(Body::empty()) diff --git a/src/main.rs b/src/main.rs index 5e6c20c..2b25f32 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,8 +17,9 @@ async fn upstream_req(req: Request<Body>, config: ProxyConfig) -> Result<Respons } Err(other) => { Response::builder() - .status(StatusCode::NOT_FOUND) - .body(format!("oh noooo! {:?}", other).into()) + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header("Content-Type", "application/json; charset=UTF-8") + .body(serde_json::to_string(&other.to_json()).unwrap().into()) .unwrap() }, }; |