diff options
Diffstat (limited to 'rust/src')
| -rw-r--r-- | rust/src/bin/fatcat-export.rs | 78 | 
1 files changed, 50 insertions, 28 deletions
| diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index fdb69e26..0d6d69b1 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -10,11 +10,11 @@ extern crate fatcat;  extern crate fatcat_api_spec;  #[macro_use]  extern crate log; -extern crate env_logger; -extern crate uuid;  extern crate crossbeam_channel; -extern crate serde_json; +extern crate env_logger;  extern crate num_cpus; +extern crate serde_json; +extern crate uuid;  use clap::{App, Arg};  use dotenv::dotenv; @@ -22,13 +22,13 @@ use std::env;  use diesel::prelude::*;  use diesel::r2d2::ConnectionManager; +use fatcat::api_entity_crud::*; +use fatcat::api_helpers::*; +use fatcat::errors::*; +use fatcat::ConnectionPool;  use fatcat_api_spec::models::*;  use std::str::FromStr;  use uuid::Uuid; -use fatcat::ConnectionPool; -use fatcat::api_helpers::*; -use fatcat::api_entity_crud::*; -use fatcat::errors::*;  use error_chain::ChainedError;  use std::thread; @@ -100,7 +100,10 @@ generic_loop_work!(loop_work_file, FileEntity);  generic_loop_work!(loop_work_release, ReleaseEntity);  generic_loop_work!(loop_work_work, WorkEntity); -fn loop_printer(output_receiver: channel::Receiver<String>, done_sender: channel::Sender<()>) -> Result<()> { +fn loop_printer( +    output_receiver: channel::Receiver<String>, +    done_sender: channel::Sender<()>, +) -> Result<()> {      let output = std::io::stdout();      // XXX should log...      // let mut buf_output = BufWriter::new(output.lock()); @@ -134,9 +137,18 @@ fn parse_line(s: &str) -> Result<IdentRow> {  #[test]  fn test_parse_line() { -    assert!(parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t").is_ok()); -    assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t").is_err()); -    assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001").is_err()); +    assert!( +        parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t") +            .is_ok() +    ); +    assert!( +        parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t") +            .is_err() +    ); +    assert!( +        parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001") +            .is_err() +    );      assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok());  } @@ -151,8 +163,12 @@ fn test_parse_line() {  // 4. read rows, pushing to row channel  //      => every N lines, log to stderr  // 5. wait for all channels to finish, and quit -pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType, redirects: bool) -> Result<()> { - +pub fn do_export( +    num_workers: usize, +    expand: Option<ExpandFlags>, +    entity_type: ExportEntityType, +    redirects: bool, +) -> Result<()> {      let db_pool = database_worker_pool()?;      let buf_input = BufReader::new(std::io::stdin());      let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); @@ -168,21 +184,26 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E          let row_receiver = row_receiver.clone();          let output_sender = output_sender.clone();          match entity_type { -            ExportEntityType::Container => -                thread::spawn(move || loop_work_container(row_receiver, output_sender, &db_conn, expand)), -            ExportEntityType::Creator => -                thread::spawn(move || loop_work_creator(row_receiver, output_sender, &db_conn, expand)), -            ExportEntityType::File => -                thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)), -            ExportEntityType::Release => -                thread::spawn(move || loop_work_release(row_receiver, output_sender, &db_conn, expand)), -            ExportEntityType::Work => -                thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)), +            ExportEntityType::Container => thread::spawn(move || { +                loop_work_container(row_receiver, output_sender, &db_conn, expand) +            }), +            ExportEntityType::Creator => thread::spawn(move || { +                loop_work_creator(row_receiver, output_sender, &db_conn, expand) +            }), +            ExportEntityType::File => { +                thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)) +            } +            ExportEntityType::Release => thread::spawn(move || { +                loop_work_release(row_receiver, output_sender, &db_conn, expand) +            }), +            ExportEntityType::Work => { +                thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)) +            }          };      }      drop(output_sender);      // Start printer thread -    thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout") ); +    thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout"));      let mut count = 0;      for line in buf_input.lines() { @@ -200,12 +221,14 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E      }      drop(row_sender);      done_receiver.recv(); -    info!("Done reading ({} lines), waiting for workers to exit...", count); +    info!( +        "Done reading ({} lines), waiting for workers to exit...", +        count +    );      Ok(())  }  fn run() -> Result<()> { -      let m = App::new("fatcat-export")          .version(env!("CARGO_PKG_VERSION"))          .author("Bryan Newbold <bnewbold@archive.org>") @@ -236,8 +259,7 @@ fn run() -> Result<()> {      } else {          "info"      }; -    let env = env_logger::Env::default() -            .filter_or(env_logger::DEFAULT_FILTER_ENV, log_level); +    let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log_level);      env_logger::Builder::from_env(env).init();      do_export( | 
