diff options
Diffstat (limited to 'rust/src')
| -rw-r--r-- | rust/src/bin/fatcat-export.rs | 89 | 
1 files changed, 65 insertions, 24 deletions
| diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index 6f9b92cf..1af321ba 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -9,16 +9,14 @@ extern crate error_chain;  extern crate fatcat;  extern crate fatcat_api_spec;  #[macro_use] -extern crate slog; -extern crate slog_async; -extern crate slog_term; +extern crate log; +extern crate env_logger;  extern crate uuid;  extern crate crossbeam_channel;  extern crate serde_json;  extern crate num_cpus;  use clap::{App, Arg}; -use slog::{Drain, Logger};  use dotenv::dotenv;  use std::env; @@ -32,6 +30,7 @@ use fatcat::api_helpers::*;  use fatcat::api_entity_crud::*;  use fatcat::errors::*; +use error_chain::ChainedError;  use std::thread;  //use std::io::{Stdout,StdoutLock};  use crossbeam_channel as channel; @@ -71,18 +70,26 @@ pub fn database_worker_pool() -> Result<ConnectionPool> {  macro_rules! generic_loop_work {      ($fn_name:ident, $entity_model:ident) => { -        fn $fn_name(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> { -            for row in row_receiver { -                let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; -                //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; -                entity.state = Some("active".to_string()); // XXX -                entity.ident = Some(row.ident_id.to_string()); -                if let Some(expand) = expand { -                    entity.db_expand(db_conn, expand)?; +        fn $fn_name(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) { +            let result: Result<()> = (|| { +                for row in row_receiver { +                    let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row")) +                        .chain_err(|| "reading entity from database")?; +                    //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; +                    entity.state = Some("active".to_string()); // XXX +                    entity.ident = Some(row.ident_id.to_string()); +                    if let Some(expand) = expand { +                        entity.db_expand(db_conn, expand) +                            .chain_err(|| "expanding sub-entities from database")?; +                    } +                    output_sender.send(serde_json::to_string(&entity)?);                  } -                output_sender.send(serde_json::to_string(&entity)?); +                Ok(()) +            })(); +            if let Err(ref e) = result { +                error!("{}", e.display_chain())              } -            Ok(()) +            result.unwrap()          }      }  } @@ -114,14 +121,23 @@ fn parse_line(s: String) -> Result<IdentRow> {      }      Ok(IdentRow {          ident_id: FatCatId::from_uuid(&Uuid::from_str(&fields[0])?), -        rev_id: Some(Uuid::from_str(&fields[1])?), -        redirect_id: None, +        rev_id: match fields[1].as_ref() { +            "\\N" => None, +            val => Some(Uuid::from_str(&val)?), +        }, +        redirect_id: match fields[2].as_ref() { +            "\\N" => None, +            val => Some(FatCatId::from_uuid(&Uuid::from_str(&val)?)), +        },      })  }  #[test]  fn test_parse_line() { -    assert!(false) +    assert!(parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t\\N").is_ok()); +    assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t\\N").is_err()); +    assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001").is_err()); +    assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok());  }  // Use num_cpus/2, or CLI arg for worker count @@ -135,7 +151,7 @@ fn test_parse_line() {  // 4. read rows, pushing to row channel  //      => every N lines, log to stderr  // 5. wait for all channels to finish, and quit -pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType) -> Result<()> { +pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType, redirects: bool) -> Result<()> {      let db_pool = database_worker_pool()?;      let buf_input = BufReader::new(std::io::stdin()); @@ -143,6 +159,8 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E      let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);      let (done_sender, done_receiver) = channel::bounded(0); +    info!("Starting an export of {} entities", entity_type); +      // Start row worker threads      assert!(num_workers > 0);      for _ in 0..num_workers { @@ -164,15 +182,25 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E      }      drop(output_sender);      // Start printer thread -    thread::spawn(move || loop_printer(output_receiver, done_sender)); +    thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout") ); +    let mut count = 0;      for line in buf_input.lines() {          let line = line?;          let row = parse_line(line)?; -        row_sender.send(row); +        match (row.rev_id, row.redirect_id, redirects) { +            (None, _, _) => (), +            (Some(_), Some(_), false) => (), +            _ => row_sender.send(row), +        } +        count += 1; +        if count % 1000 == 0 { +            info!("processed {} lines...", count); +        }      }      drop(row_sender);      done_receiver.recv(); +    info!("Done reading ({} lines), waiting for workers to exit...", count);      Ok(())  } @@ -182,28 +210,41 @@ fn run() -> Result<()> {          .version(env!("CARGO_PKG_VERSION"))          .author("Bryan Newbold <bnewbold@archive.org>")          .about("Fast exports of database entities from an id list") +        .arg(Arg::from_usage("<entity_type> 'what entity type the idents correspond to'") +            .possible_values(&ExportEntityType::variants()) +            .case_insensitive(true))          .args_from_usage( -            "-f --workers=[workers] 'number of threads (database connections) to use' -             --expand=[expand] 'sub-entities to include in dump' -             <entity_type> 'what entity type the idents correspond to'") +            "-j --workers=[workers] 'number of threads (database connections) to use' +             -q --quiet 'less status output to stderr' +             --include-redirects 'include redirected idents (normally skipped)' +             --expand=[expand] 'sub-entities to include in dump'")          .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \              and outputs JSON (one entity per line). Database connection info read from environment \              (DATABASE_URL, same as fatcatd).")          .get_matches();      let num_workers: usize = match m.value_of("workers") { -        Some(v) => value_t_or_exit!(m.value_of("workers"), usize), +        Some(_) => value_t_or_exit!(m.value_of("workers"), usize),          None => std::cmp::min(1, num_cpus::get() / 2) as usize,      };      let expand = match m.value_of("expand") {          Some(s) => Some(ExpandFlags::from_str(&s)?),          None => None,      }; +    let log_level = if m.is_present("quiet") { +        "warn" +    } else { +        "info" +    }; +    let env = env_logger::Env::default() +            .filter_or(env_logger::DEFAULT_FILTER_ENV, log_level); +    env_logger::Builder::from_env(env).init();      do_export(          num_workers,          expand,          value_t_or_exit!(m.value_of("entity_type"), ExportEntityType), +        m.is_present("include_redirects"),      )  } | 
