From f7f18bb588023e869c347ddfd23bf1ed5d16e527 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 11 Sep 2018 22:09:58 -0700 Subject: improvements to fatcat-export output --- rust/src/bin/fatcat-export.rs | 89 +++++++++++++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 24 deletions(-) (limited to 'rust/src') diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index 6f9b92cf..1af321ba 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -9,16 +9,14 @@ extern crate error_chain; extern crate fatcat; extern crate fatcat_api_spec; #[macro_use] -extern crate slog; -extern crate slog_async; -extern crate slog_term; +extern crate log; +extern crate env_logger; extern crate uuid; extern crate crossbeam_channel; extern crate serde_json; extern crate num_cpus; use clap::{App, Arg}; -use slog::{Drain, Logger}; use dotenv::dotenv; use std::env; @@ -32,6 +30,7 @@ use fatcat::api_helpers::*; use fatcat::api_entity_crud::*; use fatcat::errors::*; +use error_chain::ChainedError; use std::thread; //use std::io::{Stdout,StdoutLock}; use crossbeam_channel as channel; @@ -71,18 +70,26 @@ pub fn database_worker_pool() -> Result { macro_rules! generic_loop_work { ($fn_name:ident, $entity_model:ident) => { - fn $fn_name(row_receiver: channel::Receiver, output_sender: channel::Sender, db_conn: &DbConn, expand: Option) -> Result<()> { - for row in row_receiver { - let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; - //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; - entity.state = Some("active".to_string()); // XXX - entity.ident = Some(row.ident_id.to_string()); - if let Some(expand) = expand { - entity.db_expand(db_conn, expand)?; + fn $fn_name(row_receiver: channel::Receiver, output_sender: channel::Sender, db_conn: &DbConn, expand: Option) { + let result: Result<()> = (|| { + for row in row_receiver { + let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row")) + .chain_err(|| "reading entity from database")?; + //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; + entity.state = Some("active".to_string()); // XXX + entity.ident = Some(row.ident_id.to_string()); + if let Some(expand) = expand { + entity.db_expand(db_conn, expand) + .chain_err(|| "expanding sub-entities from database")?; + } + output_sender.send(serde_json::to_string(&entity)?); } - output_sender.send(serde_json::to_string(&entity)?); + Ok(()) + })(); + if let Err(ref e) = result { + error!("{}", e.display_chain()) } - Ok(()) + result.unwrap() } } } @@ -114,14 +121,23 @@ fn parse_line(s: String) -> Result { } Ok(IdentRow { ident_id: FatCatId::from_uuid(&Uuid::from_str(&fields[0])?), - rev_id: Some(Uuid::from_str(&fields[1])?), - redirect_id: None, + rev_id: match fields[1].as_ref() { + "\\N" => None, + val => Some(Uuid::from_str(&val)?), + }, + redirect_id: match fields[2].as_ref() { + "\\N" => None, + val => Some(FatCatId::from_uuid(&Uuid::from_str(&val)?)), + }, }) } #[test] fn test_parse_line() { - assert!(false) + assert!(parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t\\N").is_ok()); + assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t\\N").is_err()); + assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001").is_err()); + assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok()); } // Use num_cpus/2, or CLI arg for worker count @@ -135,7 +151,7 @@ fn test_parse_line() { // 4. read rows, pushing to row channel // => every N lines, log to stderr // 5. wait for all channels to finish, and quit -pub fn do_export(num_workers: usize, expand: Option, entity_type: ExportEntityType) -> Result<()> { +pub fn do_export(num_workers: usize, expand: Option, entity_type: ExportEntityType, redirects: bool) -> Result<()> { let db_pool = database_worker_pool()?; let buf_input = BufReader::new(std::io::stdin()); @@ -143,6 +159,8 @@ pub fn do_export(num_workers: usize, expand: Option, entity_type: E let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); let (done_sender, done_receiver) = channel::bounded(0); + info!("Starting an export of {} entities", entity_type); + // Start row worker threads assert!(num_workers > 0); for _ in 0..num_workers { @@ -164,15 +182,25 @@ pub fn do_export(num_workers: usize, expand: Option, entity_type: E } drop(output_sender); // Start printer thread - thread::spawn(move || loop_printer(output_receiver, done_sender)); + thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout") ); + let mut count = 0; for line in buf_input.lines() { let line = line?; let row = parse_line(line)?; - row_sender.send(row); + match (row.rev_id, row.redirect_id, redirects) { + (None, _, _) => (), + (Some(_), Some(_), false) => (), + _ => row_sender.send(row), + } + count += 1; + if count % 1000 == 0 { + info!("processed {} lines...", count); + } } drop(row_sender); done_receiver.recv(); + info!("Done reading ({} lines), waiting for workers to exit...", count); Ok(()) } @@ -182,28 +210,41 @@ fn run() -> Result<()> { .version(env!("CARGO_PKG_VERSION")) .author("Bryan Newbold ") .about("Fast exports of database entities from an id list") + .arg(Arg::from_usage(" 'what entity type the idents correspond to'") + .possible_values(&ExportEntityType::variants()) + .case_insensitive(true)) .args_from_usage( - "-f --workers=[workers] 'number of threads (database connections) to use' - --expand=[expand] 'sub-entities to include in dump' - 'what entity type the idents correspond to'") + "-j --workers=[workers] 'number of threads (database connections) to use' + -q --quiet 'less status output to stderr' + --include-redirects 'include redirected idents (normally skipped)' + --expand=[expand] 'sub-entities to include in dump'") .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \ and outputs JSON (one entity per line). Database connection info read from environment \ (DATABASE_URL, same as fatcatd).") .get_matches(); let num_workers: usize = match m.value_of("workers") { - Some(v) => value_t_or_exit!(m.value_of("workers"), usize), + Some(_) => value_t_or_exit!(m.value_of("workers"), usize), None => std::cmp::min(1, num_cpus::get() / 2) as usize, }; let expand = match m.value_of("expand") { Some(s) => Some(ExpandFlags::from_str(&s)?), None => None, }; + let log_level = if m.is_present("quiet") { + "warn" + } else { + "info" + }; + let env = env_logger::Env::default() + .filter_or(env_logger::DEFAULT_FILTER_ENV, log_level); + env_logger::Builder::from_env(env).init(); do_export( num_workers, expand, value_t_or_exit!(m.value_of("entity_type"), ExportEntityType), + m.is_present("include_redirects"), ) } -- cgit v1.2.3