summaryrefslogtreecommitdiffstats
path: root/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs49
1 files changed, 38 insertions, 11 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 00b7b38..fc13e64 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,6 +1,7 @@
use serde::Deserialize;
use hyper::{Request, Body, Method, Uri};
+use serde_json::json;
pub mod parse;
@@ -43,6 +44,19 @@ pub enum ProxyError {
NotFound(String),
}
+impl ProxyError {
+
+ pub fn to_json(&self) -> serde_json::Value {
+ json!({
+ "error": {
+ "reason": format!("{:?}", self),
+ "type": "unknown",
+ },
+ "status": 500,
+ })
+ }
+}
+
pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result<Request<Body>, ProxyError> {
let (parts, body) = req.into_parts();
@@ -56,7 +70,8 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result<
return Err(ProxyError::NotSupported("only request paths with up to three segments allowed".to_string()))
}
- let params: UrlQueryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or("")).unwrap();
+ let params: UrlQueryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
+ .map_err(|e| ProxyError::ParseError(e.to_string()))?;
// this is sort of like a router
let body = match (&parts.method, path_chunks.as_slice()) {
@@ -64,15 +79,21 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result<
Body::empty()
},
(&Method::POST, ["_search", "scroll"]) | (&Method::DELETE, ["_search", "scroll"]) => {
- let whole_body = hyper::body::to_bytes(body).await.unwrap();
+ let whole_body = hyper::body::to_bytes(body)
+ .await
+ .map_err(|e| ProxyError::Malformed(e.to_string()))?;
filter_scroll_request(&params, &whole_body, config)?
},
(&Method::GET, [index, "_search"]) | (&Method::POST, [index, "_search"]) => {
- let whole_body = hyper::body::to_bytes(body).await.unwrap();
+ let whole_body = hyper::body::to_bytes(body)
+ .await
+ .map_err(|e| ProxyError::Malformed(e.to_string()))?;
filter_search_request(index, &params, &whole_body, config)?
},
(&Method::GET, [index, "_count"]) | (&Method::POST, [index, "_count"]) => {
- let whole_body = hyper::body::to_bytes(body).await.unwrap();
+ let whole_body = hyper::body::to_bytes(body)
+ .await
+ .map_err(|e| ProxyError::Malformed(e.to_string()))?;
filter_search_request(index, &params, &whole_body, config)?
},
(&Method::GET, [index, "_doc", key]) | (&Method::GET, [index, "_source", key]) => {
@@ -81,7 +102,7 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result<
_ => Err(ProxyError::NotSupported("unknown endpoint".to_string()))?,
};
- let upstream_query = serde_urlencoded::to_string(params).unwrap();
+ let upstream_query = serde_urlencoded::to_string(params).expect("re-encoding URL parameters");
let upstream_query_and_params = if upstream_query.len() > 0 {
format!("{}?{}", req_path, upstream_query)
} else {
@@ -92,21 +113,26 @@ pub async fn filter_request(req: Request<Body>, config: &ProxyConfig) -> Result<
.authority(config.upstream_addr.as_ref().unwrap_or(&"localhost:9200".to_string()).as_str())
.path_and_query(upstream_query_and_params.as_str())
.build()
- .unwrap();
+ .expect("constructing upstream request URI");
let upstream_req = Request::builder()
.uri(upstream_uri)
.method(&parts.method)
.body(body)
- .unwrap();
+ .expect("constructing upstream request");
Ok(upstream_req)
}
-pub fn filter_scroll_request(_params: &UrlQueryParams, _body: &[u8], _config: &ProxyConfig) -> Result<Body, ProxyError> {
+pub fn filter_scroll_request(_params: &UrlQueryParams, body: &[u8], _config: &ProxyConfig) -> Result<Body, ProxyError> {
// XXX
// TODO: check that scroll_id is not "_all"
- //let _parsed: ScrollBody = serde_json::from_str(&body).unwrap();
- Err(ProxyError::NotSupported("not yet implemented".to_string()))
+ if body.len() > 0 {
+ let parsed: parse::ScrollBody = serde_json::from_slice(body)
+ .map_err(|e| ProxyError::ParseError(e.to_string()))?;
+ Ok(Body::from(serde_json::to_string(&parsed).unwrap()))
+ } else {
+ Ok(Body::empty())
+ }
}
pub fn filter_read_request(index: &str, _endpoint: &str, _key: &str, _params: &UrlQueryParams, config: &ProxyConfig) -> Result<Body, ProxyError>{
@@ -123,7 +149,8 @@ pub fn filter_search_request(index: &str, _params: &UrlQueryParams, body: &[u8],
}
// XXX: more checks
if body.len() > 0 {
- let parsed: parse::ScrollBody = serde_json::from_slice(body).unwrap();
+ let parsed: parse::SearchBody = serde_json::from_slice(body)
+ .map_err(|e| ProxyError::ParseError(e.to_string()))?;
Ok(Body::from(serde_json::to_string(&parsed).unwrap()))
} else {
Ok(Body::empty())