use crate::SearchEntityType; use anyhow::{anyhow, Result}; use log::{self, info}; use serde_json::json; use std::time::Duration; pub struct SearchResults { pub entity_type: SearchEntityType, pub limit: Option, pub count: u64, pub took_ms: u64, offset: u64, batch: Vec, scroll_id: Option, scroll_url: String, http_client: reqwest::blocking::Client, } impl Iterator for SearchResults { type Item = Result; fn next(&mut self) -> Option> { // if we already hit limit, bail early if let Some(l) = self.limit { if self.offset >= l { return None; } } // if current batch is empty, and we are scrolling, refill the current batch if self.batch.is_empty() && self.scroll_id.is_some() { let response = self .http_client .get(&self.scroll_url) .header("Content-Type", "application/json") .body( json!({ "scroll": "2m", "scroll_id": self.scroll_id.clone().unwrap(), }) .to_string(), ) .send(); let response = match response { Err(e) => return Some(Err(e.into())), Ok(v) => v, }; if !response.status().is_success() { return Some(Err(anyhow!("search error, status={}", response.status()))); }; let body: serde_json::Value = match response.json() { Err(e) => return Some(Err(e.into())), Ok(v) => v, }; self.scroll_id = Some(body["_scroll_id"].as_str().unwrap().to_string()); self.batch = body["hits"]["hits"].as_array().unwrap().to_vec(); } // return next hit from the most recent batch if !self.batch.is_empty() { self.offset += 1; let val = self.batch.pop().unwrap(); let source = val["_source"].clone(); return Some(Ok(source)); } // if batch is empty and couldn't be refilled, terminate // TODO: should we raise error if ended early? None } } pub fn crude_search( api_host: &str, entity_type: SearchEntityType, limit: Option, terms: Vec, ) -> Result { let index = match entity_type { SearchEntityType::Release => "fatcat_release", SearchEntityType::Container => "fatcat_container", SearchEntityType::File => "fatcat_file", SearchEntityType::Scholar => "scholar_fulltext", SearchEntityType::Reference | SearchEntityType::ReferenceIn | SearchEntityType::ReferenceOut => "fatcat_ref", }; let http_client = reqwest::blocking::Client::builder() .timeout(Duration::from_secs(10)) .danger_accept_invalid_certs(true) .build() .expect("ERROR :: Could not build reqwest client"); let query: String = if terms.is_empty() { "*".to_string() } else { terms.join(" ") }; info!("Search query string: {}", query); let request_url = format!("{}/{}/_search", api_host, index); let scroll_url = format!("{}/_search/scroll", api_host); // sort by _doc for (potentially) very large result sets let (scroll_mode, sort_mode, size) = match limit { None => (true, "_doc", 100), Some(l) if l > 100 => (true, "_doc", 100), Some(l) => (false, "_score", l), }; let query_body = match entity_type { SearchEntityType::Release => json!({ "query": { "boosting": { "positive": { "bool": { "must": { "query_string": { "query": query, "default_operator": "AND", "analyze_wildcard": true, "allow_leading_wildcard": false, "lenient": true, "fields": [ "title^2", "biblio", ], }, }, "should": { "term": { "in_ia": true }, }, }, }, "negative": { "bool": { "should": [ {"bool": { "must_not" : { "exists": { "field": "title" }}}}, {"bool": { "must_not" : { "exists": { "field": "year" }}}}, {"bool": { "must_not" : { "exists": { "field": "type" }}}}, {"bool": { "must_not" : { "exists": { "field": "stage" }}}}, ], }, }, "negative_boost": 0.5, }, }, "size": size, "sort": [ sort_mode ], "track_total_hits": true, }), SearchEntityType::Container => json!({ "query": { "query_string": { "query": query, "default_operator": "AND", "analyze_wildcard": true, "allow_leading_wildcard": false, "lenient": true, "fields": [ "name^2", "biblio", ], }, }, "size": size, "sort": [ sort_mode ], "track_total_hits": true, }), SearchEntityType::File => json!({ "query": { "query_string": { "query": query, "default_operator": "AND", "analyze_wildcard": true, "allow_leading_wildcard": false, "lenient": true, }, }, "size": size, "sort": [ sort_mode ], "track_total_hits": true, }), SearchEntityType::Scholar => json!({ "query": { "boosting": { "positive": { "bool": { "must": { "query_string": { "query": query, "default_operator": "AND", "analyze_wildcard": true, "allow_leading_wildcard": false, "lenient": true, "quote_field_suffix": ".exact", "fields": [ "title^4", "biblio_all^3", "everything", ], }, }, "should": { "terms": { "access_type": ["ia_sim", "ia_file", "wayback"]}, }, }, }, "negative": { "bool": { "should": [ {"bool": { "must_not" : { "exists": { "field": "year" }}}}, {"bool": { "must_not" : { "exists": { "field": "type" }}}}, {"bool": { "must_not" : { "exists": { "field": "stage" }}}}, {"bool": { "must_not" : { "exists": { "field": "biblio.container_name" }}}}, ], }, }, "negative_boost": 0.5, }, }, "size": size, "sort": [ sort_mode ], "track_total_hits": true, }), SearchEntityType::Reference | SearchEntityType::ReferenceIn | SearchEntityType::ReferenceOut => json!({ "query": { "query_string": { "query": query, "default_operator": "AND", "analyze_wildcard": true, "allow_leading_wildcard": false, "lenient": true, }, }, "size": size, "sort": [ sort_mode ], "track_total_hits": true, }), }.to_string(); let mut request = http_client .get(&request_url) .header("Content-Type", "application/json") .body(query_body); if scroll_mode { request = request.query(&[("scroll", "2m")]); } let response = request.send()?; if !response.status().is_success() { return Err(anyhow!("search error, status={}", response.status())); } //println!("{:?}", response); let body: serde_json::Value = response.json()?; let scroll_id = if scroll_mode { Some(body["_scroll_id"].as_str().unwrap().to_string()) } else { None }; Ok(SearchResults { entity_type, limit, count: body["hits"]["total"]["value"].as_u64().unwrap(), took_ms: body["took"].as_u64().unwrap(), offset: 0, batch: body["hits"]["hits"].as_array().unwrap().to_vec(), scroll_id, scroll_url, http_client, }) }