From 538cd2ec602a044dc36ca81d9a2a07863080c764 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 11 Sep 2018 20:56:50 -0700 Subject: improvements to fatcat-export --- rust/Cargo.lock | 1 + rust/Cargo.toml | 3 +- rust/src/bin/fatcat-export.rs | 105 +++++++++++++++++++++++++++++++----------- 3 files changed, 80 insertions(+), 29 deletions(-) (limited to 'rust') diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6d12659e..2a7d6017 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -365,6 +365,7 @@ dependencies = [ "iron-test 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0e14c583..7c6695d5 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,7 +13,7 @@ fatcat-api-spec = {version = "*", path = "fatcat-api-spec"} diesel = { version = "1.3", features = ["postgres", "uuid", "serde_json", "chrono", "r2d2"] } diesel_migrations = "1.3" dotenv = "0.9.0" -clap = "*" +clap = "2" error-chain = "0.12" uuid = "0.5" log = "*" @@ -37,6 +37,7 @@ serde_json = "1.0" # Command-line tools crossbeam-channel = "0.2" +num_cpus = "1" # Unused (hyper 0.11 and https) #hyper-openssl = {version = "0.2", optional = true} diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index d7371c87..6f9b92cf 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -1,5 +1,6 @@ //! JSON Export Helper +#[macro_use] extern crate clap; extern crate diesel; extern crate dotenv; @@ -14,6 +15,7 @@ extern crate slog_term; extern crate uuid; extern crate crossbeam_channel; extern crate serde_json; +extern crate num_cpus; use clap::{App, Arg}; use slog::{Drain, Logger}; @@ -39,6 +41,17 @@ use std::io::{BufReader, BufWriter}; const CHANNEL_BUFFER_LEN: usize = 200; +arg_enum!{ + #[derive(PartialEq, Debug, Clone, Copy)] + pub enum ExportEntityType { + Creator, + Container, + File, + Release, + Work + } +} + struct IdentRow { ident_id: FatCatId, rev_id: Option, @@ -56,20 +69,30 @@ pub fn database_worker_pool() -> Result { Ok(pool) } -fn loop_work(row_receiver: channel::Receiver, output_sender: channel::Sender, db_conn: &DbConn, expand: Option) -> Result<()> { - for row in row_receiver { - // TODO: based on types - let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; - entity.state = Some("active".to_string()); // TODO - entity.ident = Some(row.ident_id.to_string()); - if let Some(expand) = expand { - entity.db_expand(db_conn, expand)?; +macro_rules! generic_loop_work { + ($fn_name:ident, $entity_model:ident) => { + fn $fn_name(row_receiver: channel::Receiver, output_sender: channel::Sender, db_conn: &DbConn, expand: Option) -> Result<()> { + for row in row_receiver { + let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; + //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; + entity.state = Some("active".to_string()); // XXX + entity.ident = Some(row.ident_id.to_string()); + if let Some(expand) = expand { + entity.db_expand(db_conn, expand)?; + } + output_sender.send(serde_json::to_string(&entity)?); + } + Ok(()) } - output_sender.send(serde_json::to_string(&entity)?); } - Ok(()) } +generic_loop_work!(loop_work_container, ContainerEntity); +generic_loop_work!(loop_work_creator, CreatorEntity); +generic_loop_work!(loop_work_file, FileEntity); +generic_loop_work!(loop_work_release, ReleaseEntity); +generic_loop_work!(loop_work_work, WorkEntity); + fn loop_printer(output_receiver: channel::Receiver, done_sender: channel::Sender<()>) -> Result<()> { let output = std::io::stdout(); // XXX should log... @@ -112,21 +135,32 @@ fn test_parse_line() { // 4. read rows, pushing to row channel // => every N lines, log to stderr // 5. wait for all channels to finish, and quit -pub fn run() -> Result<()> { +pub fn do_export(num_workers: usize, expand: Option, entity_type: ExportEntityType) -> Result<()> { let db_pool = database_worker_pool()?; let buf_input = BufReader::new(std::io::stdin()); - let worker_count = 4; let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); let (done_sender, done_receiver) = channel::bounded(0); // Start row worker threads - for _ in 0..worker_count { + assert!(num_workers > 0); + for _ in 0..num_workers { let db_conn = db_pool.get().expect("database pool"); let row_receiver = row_receiver.clone(); let output_sender = output_sender.clone(); - thread::spawn(move || loop_work(row_receiver, output_sender, &db_conn, None)); + match entity_type { + ExportEntityType::Container => + thread::spawn(move || loop_work_container(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::Creator => + thread::spawn(move || loop_work_creator(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::File => + thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::Release => + thread::spawn(move || loop_work_release(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::Work => + thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)), + }; } drop(output_sender); // Start printer thread @@ -142,20 +176,35 @@ pub fn run() -> Result<()> { Ok(()) } -fn main() { - /* - let matches = App::new("server") - .arg( - Arg::with_name("https") - .long("https") - .help("Whether to use HTTPS or not"), - ) +fn run() -> Result<()> { + + let m = App::new("fatcat-export") + .version(env!("CARGO_PKG_VERSION")) + .author("Bryan Newbold ") + .about("Fast exports of database entities from an id list") + .args_from_usage( + "-f --workers=[workers] 'number of threads (database connections) to use' + --expand=[expand] 'sub-entities to include in dump' + 'what entity type the idents correspond to'") + .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \ + and outputs JSON (one entity per line). Database connection info read from environment \ + (DATABASE_URL, same as fatcatd).") .get_matches(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::CompactFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - let logger = Logger::root(drain, o!()); - */ - run().expect("success") + let num_workers: usize = match m.value_of("workers") { + Some(v) => value_t_or_exit!(m.value_of("workers"), usize), + None => std::cmp::min(1, num_cpus::get() / 2) as usize, + }; + let expand = match m.value_of("expand") { + Some(s) => Some(ExpandFlags::from_str(&s)?), + None => None, + }; + + do_export( + num_workers, + expand, + value_t_or_exit!(m.value_of("entity_type"), ExportEntityType), + ) } + +quick_main!(run); -- cgit v1.2.3