diff options
Diffstat (limited to 'rust')
| -rw-r--r-- | rust/Cargo.lock | 1 | ||||
| -rw-r--r-- | rust/Cargo.toml | 3 | ||||
| -rw-r--r-- | rust/src/bin/fatcat-export.rs | 105 | 
3 files changed, 80 insertions, 29 deletions
| diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6d12659e..2a7d6017 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -365,6 +365,7 @@ dependencies = [   "iron-test 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",   "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",   "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",   "regex 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",   "serde_json 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",   "sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0e14c583..7c6695d5 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,7 +13,7 @@ fatcat-api-spec = {version = "*", path = "fatcat-api-spec"}  diesel = { version = "1.3", features = ["postgres", "uuid", "serde_json", "chrono", "r2d2"] }  diesel_migrations = "1.3"  dotenv = "0.9.0" -clap = "*" +clap = "2"  error-chain = "0.12"  uuid = "0.5"  log = "*" @@ -37,6 +37,7 @@ serde_json = "1.0"  # Command-line tools  crossbeam-channel = "0.2" +num_cpus = "1"  # Unused (hyper 0.11 and https)  #hyper-openssl = {version = "0.2", optional = true} diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index d7371c87..6f9b92cf 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -1,5 +1,6 @@  //! JSON Export Helper +#[macro_use]  extern crate clap;  extern crate diesel;  extern crate dotenv; @@ -14,6 +15,7 @@ extern crate slog_term;  extern crate uuid;  extern crate crossbeam_channel;  extern crate serde_json; +extern crate num_cpus;  use clap::{App, Arg};  use slog::{Drain, Logger}; @@ -39,6 +41,17 @@ use std::io::{BufReader, BufWriter};  const CHANNEL_BUFFER_LEN: usize = 200; +arg_enum!{ +    #[derive(PartialEq, Debug, Clone, Copy)] +    pub enum ExportEntityType { +        Creator, +        Container, +        File, +        Release, +        Work +    } +} +  struct IdentRow {      ident_id: FatCatId,      rev_id: Option<Uuid>, @@ -56,20 +69,30 @@ pub fn database_worker_pool() -> Result<ConnectionPool> {      Ok(pool)  } -fn loop_work(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> { -    for row in row_receiver { -        // TODO: based on types -        let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; -        entity.state = Some("active".to_string()); // TODO -        entity.ident = Some(row.ident_id.to_string()); -        if let Some(expand) = expand { -            entity.db_expand(db_conn, expand)?; +macro_rules! generic_loop_work { +    ($fn_name:ident, $entity_model:ident) => { +        fn $fn_name(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> { +            for row in row_receiver { +                let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; +                //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; +                entity.state = Some("active".to_string()); // XXX +                entity.ident = Some(row.ident_id.to_string()); +                if let Some(expand) = expand { +                    entity.db_expand(db_conn, expand)?; +                } +                output_sender.send(serde_json::to_string(&entity)?); +            } +            Ok(())          } -        output_sender.send(serde_json::to_string(&entity)?);      } -    Ok(())  } +generic_loop_work!(loop_work_container, ContainerEntity); +generic_loop_work!(loop_work_creator, CreatorEntity); +generic_loop_work!(loop_work_file, FileEntity); +generic_loop_work!(loop_work_release, ReleaseEntity); +generic_loop_work!(loop_work_work, WorkEntity); +  fn loop_printer(output_receiver: channel::Receiver<String>, done_sender: channel::Sender<()>) -> Result<()> {      let output = std::io::stdout();      // XXX should log... @@ -112,21 +135,32 @@ fn test_parse_line() {  // 4. read rows, pushing to row channel  //      => every N lines, log to stderr  // 5. wait for all channels to finish, and quit -pub fn run() -> Result<()> { +pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType) -> Result<()> {      let db_pool = database_worker_pool()?;      let buf_input = BufReader::new(std::io::stdin()); -    let worker_count = 4;      let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);      let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);      let (done_sender, done_receiver) = channel::bounded(0);      // Start row worker threads -    for _ in 0..worker_count { +    assert!(num_workers > 0); +    for _ in 0..num_workers {          let db_conn = db_pool.get().expect("database pool");          let row_receiver = row_receiver.clone();          let output_sender = output_sender.clone(); -        thread::spawn(move || loop_work(row_receiver, output_sender, &db_conn, None)); +        match entity_type { +            ExportEntityType::Container => +                thread::spawn(move || loop_work_container(row_receiver, output_sender, &db_conn, expand)), +            ExportEntityType::Creator => +                thread::spawn(move || loop_work_creator(row_receiver, output_sender, &db_conn, expand)), +            ExportEntityType::File => +                thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)), +            ExportEntityType::Release => +                thread::spawn(move || loop_work_release(row_receiver, output_sender, &db_conn, expand)), +            ExportEntityType::Work => +                thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)), +        };      }      drop(output_sender);      // Start printer thread @@ -142,20 +176,35 @@ pub fn run() -> Result<()> {      Ok(())  } -fn main() { -    /* -    let matches = App::new("server") -        .arg( -            Arg::with_name("https") -                .long("https") -                .help("Whether to use HTTPS or not"), -        ) +fn run() -> Result<()> { + +    let m = App::new("fatcat-export") +        .version(env!("CARGO_PKG_VERSION")) +        .author("Bryan Newbold <bnewbold@archive.org>") +        .about("Fast exports of database entities from an id list") +        .args_from_usage( +            "-f --workers=[workers] 'number of threads (database connections) to use' +             --expand=[expand] 'sub-entities to include in dump' +             <entity_type> 'what entity type the idents correspond to'") +        .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \ +            and outputs JSON (one entity per line). Database connection info read from environment \ +            (DATABASE_URL, same as fatcatd).")          .get_matches(); -    let decorator = slog_term::TermDecorator::new().build(); -    let drain = slog_term::CompactFormat::new(decorator).build().fuse(); -    let drain = slog_async::Async::new(drain).build().fuse(); -    let logger = Logger::root(drain, o!()); -    */ -    run().expect("success") +    let num_workers: usize = match m.value_of("workers") { +        Some(v) => value_t_or_exit!(m.value_of("workers"), usize), +        None => std::cmp::min(1, num_cpus::get() / 2) as usize, +    }; +    let expand = match m.value_of("expand") { +        Some(s) => Some(ExpandFlags::from_str(&s)?), +        None => None, +    }; + +    do_export( +        num_workers, +        expand, +        value_t_or_exit!(m.value_of("entity_type"), ExportEntityType), +    )  } + +quick_main!(run); | 
