diff options
Diffstat (limited to 'rust/src/bin')
-rw-r--r-- | rust/src/bin/fatcat-export.rs | 78 |
1 files changed, 50 insertions, 28 deletions
diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index fdb69e26..0d6d69b1 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -10,11 +10,11 @@ extern crate fatcat; extern crate fatcat_api_spec; #[macro_use] extern crate log; -extern crate env_logger; -extern crate uuid; extern crate crossbeam_channel; -extern crate serde_json; +extern crate env_logger; extern crate num_cpus; +extern crate serde_json; +extern crate uuid; use clap::{App, Arg}; use dotenv::dotenv; @@ -22,13 +22,13 @@ use std::env; use diesel::prelude::*; use diesel::r2d2::ConnectionManager; +use fatcat::api_entity_crud::*; +use fatcat::api_helpers::*; +use fatcat::errors::*; +use fatcat::ConnectionPool; use fatcat_api_spec::models::*; use std::str::FromStr; use uuid::Uuid; -use fatcat::ConnectionPool; -use fatcat::api_helpers::*; -use fatcat::api_entity_crud::*; -use fatcat::errors::*; use error_chain::ChainedError; use std::thread; @@ -100,7 +100,10 @@ generic_loop_work!(loop_work_file, FileEntity); generic_loop_work!(loop_work_release, ReleaseEntity); generic_loop_work!(loop_work_work, WorkEntity); -fn loop_printer(output_receiver: channel::Receiver<String>, done_sender: channel::Sender<()>) -> Result<()> { +fn loop_printer( + output_receiver: channel::Receiver<String>, + done_sender: channel::Sender<()>, +) -> Result<()> { let output = std::io::stdout(); // XXX should log... // let mut buf_output = BufWriter::new(output.lock()); @@ -134,9 +137,18 @@ fn parse_line(s: &str) -> Result<IdentRow> { #[test] fn test_parse_line() { - assert!(parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t").is_ok()); - assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t").is_err()); - assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001").is_err()); + assert!( + parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t") + .is_ok() + ); + assert!( + parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t") + .is_err() + ); + assert!( + parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001") + .is_err() + ); assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok()); } @@ -151,8 +163,12 @@ fn test_parse_line() { // 4. read rows, pushing to row channel // => every N lines, log to stderr // 5. wait for all channels to finish, and quit -pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType, redirects: bool) -> Result<()> { - +pub fn do_export( + num_workers: usize, + expand: Option<ExpandFlags>, + entity_type: ExportEntityType, + redirects: bool, +) -> Result<()> { let db_pool = database_worker_pool()?; let buf_input = BufReader::new(std::io::stdin()); let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); @@ -168,21 +184,26 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E let row_receiver = row_receiver.clone(); let output_sender = output_sender.clone(); match entity_type { - ExportEntityType::Container => - thread::spawn(move || loop_work_container(row_receiver, output_sender, &db_conn, expand)), - ExportEntityType::Creator => - thread::spawn(move || loop_work_creator(row_receiver, output_sender, &db_conn, expand)), - ExportEntityType::File => - thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)), - ExportEntityType::Release => - thread::spawn(move || loop_work_release(row_receiver, output_sender, &db_conn, expand)), - ExportEntityType::Work => - thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::Container => thread::spawn(move || { + loop_work_container(row_receiver, output_sender, &db_conn, expand) + }), + ExportEntityType::Creator => thread::spawn(move || { + loop_work_creator(row_receiver, output_sender, &db_conn, expand) + }), + ExportEntityType::File => { + thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)) + } + ExportEntityType::Release => thread::spawn(move || { + loop_work_release(row_receiver, output_sender, &db_conn, expand) + }), + ExportEntityType::Work => { + thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)) + } }; } drop(output_sender); // Start printer thread - thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout") ); + thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout")); let mut count = 0; for line in buf_input.lines() { @@ -200,12 +221,14 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E } drop(row_sender); done_receiver.recv(); - info!("Done reading ({} lines), waiting for workers to exit...", count); + info!( + "Done reading ({} lines), waiting for workers to exit...", + count + ); Ok(()) } fn run() -> Result<()> { - let m = App::new("fatcat-export") .version(env!("CARGO_PKG_VERSION")) .author("Bryan Newbold <bnewbold@archive.org>") @@ -236,8 +259,7 @@ fn run() -> Result<()> { } else { "info" }; - let env = env_logger::Env::default() - .filter_or(env_logger::DEFAULT_FILTER_ENV, log_level); + let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log_level); env_logger::Builder::from_env(env).init(); do_export( |