diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-09-11 20:56:50 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-09-11 20:56:50 -0700 |
commit | 538cd2ec602a044dc36ca81d9a2a07863080c764 (patch) | |
tree | 354dd3580596b2893a6dec4cd81e40d4167bcfcd /rust/src | |
parent | d45deae242f6a256b52874d5a6995e3a7e0bdab4 (diff) | |
download | fatcat-538cd2ec602a044dc36ca81d9a2a07863080c764.tar.gz fatcat-538cd2ec602a044dc36ca81d9a2a07863080c764.zip |
improvements to fatcat-export
Diffstat (limited to 'rust/src')
-rw-r--r-- | rust/src/bin/fatcat-export.rs | 105 |
1 files changed, 77 insertions, 28 deletions
diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index d7371c87..6f9b92cf 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -1,5 +1,6 @@ //! JSON Export Helper +#[macro_use] extern crate clap; extern crate diesel; extern crate dotenv; @@ -14,6 +15,7 @@ extern crate slog_term; extern crate uuid; extern crate crossbeam_channel; extern crate serde_json; +extern crate num_cpus; use clap::{App, Arg}; use slog::{Drain, Logger}; @@ -39,6 +41,17 @@ use std::io::{BufReader, BufWriter}; const CHANNEL_BUFFER_LEN: usize = 200; +arg_enum!{ + #[derive(PartialEq, Debug, Clone, Copy)] + pub enum ExportEntityType { + Creator, + Container, + File, + Release, + Work + } +} + struct IdentRow { ident_id: FatCatId, rev_id: Option<Uuid>, @@ -56,20 +69,30 @@ pub fn database_worker_pool() -> Result<ConnectionPool> { Ok(pool) } -fn loop_work(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> { - for row in row_receiver { - // TODO: based on types - let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; - entity.state = Some("active".to_string()); // TODO - entity.ident = Some(row.ident_id.to_string()); - if let Some(expand) = expand { - entity.db_expand(db_conn, expand)?; +macro_rules! generic_loop_work { + ($fn_name:ident, $entity_model:ident) => { + fn $fn_name(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> { + for row in row_receiver { + let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; + //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?; + entity.state = Some("active".to_string()); // XXX + entity.ident = Some(row.ident_id.to_string()); + if let Some(expand) = expand { + entity.db_expand(db_conn, expand)?; + } + output_sender.send(serde_json::to_string(&entity)?); + } + Ok(()) } - output_sender.send(serde_json::to_string(&entity)?); } - Ok(()) } +generic_loop_work!(loop_work_container, ContainerEntity); +generic_loop_work!(loop_work_creator, CreatorEntity); +generic_loop_work!(loop_work_file, FileEntity); +generic_loop_work!(loop_work_release, ReleaseEntity); +generic_loop_work!(loop_work_work, WorkEntity); + fn loop_printer(output_receiver: channel::Receiver<String>, done_sender: channel::Sender<()>) -> Result<()> { let output = std::io::stdout(); // XXX should log... @@ -112,21 +135,32 @@ fn test_parse_line() { // 4. read rows, pushing to row channel // => every N lines, log to stderr // 5. wait for all channels to finish, and quit -pub fn run() -> Result<()> { +pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType) -> Result<()> { let db_pool = database_worker_pool()?; let buf_input = BufReader::new(std::io::stdin()); - let worker_count = 4; let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); let (done_sender, done_receiver) = channel::bounded(0); // Start row worker threads - for _ in 0..worker_count { + assert!(num_workers > 0); + for _ in 0..num_workers { let db_conn = db_pool.get().expect("database pool"); let row_receiver = row_receiver.clone(); let output_sender = output_sender.clone(); - thread::spawn(move || loop_work(row_receiver, output_sender, &db_conn, None)); + match entity_type { + ExportEntityType::Container => + thread::spawn(move || loop_work_container(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::Creator => + thread::spawn(move || loop_work_creator(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::File => + thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::Release => + thread::spawn(move || loop_work_release(row_receiver, output_sender, &db_conn, expand)), + ExportEntityType::Work => + thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)), + }; } drop(output_sender); // Start printer thread @@ -142,20 +176,35 @@ pub fn run() -> Result<()> { Ok(()) } -fn main() { - /* - let matches = App::new("server") - .arg( - Arg::with_name("https") - .long("https") - .help("Whether to use HTTPS or not"), - ) +fn run() -> Result<()> { + + let m = App::new("fatcat-export") + .version(env!("CARGO_PKG_VERSION")) + .author("Bryan Newbold <bnewbold@archive.org>") + .about("Fast exports of database entities from an id list") + .args_from_usage( + "-f --workers=[workers] 'number of threads (database connections) to use' + --expand=[expand] 'sub-entities to include in dump' + <entity_type> 'what entity type the idents correspond to'") + .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \ + and outputs JSON (one entity per line). Database connection info read from environment \ + (DATABASE_URL, same as fatcatd).") .get_matches(); - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::CompactFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - let logger = Logger::root(drain, o!()); - */ - run().expect("success") + let num_workers: usize = match m.value_of("workers") { + Some(v) => value_t_or_exit!(m.value_of("workers"), usize), + None => std::cmp::min(1, num_cpus::get() / 2) as usize, + }; + let expand = match m.value_of("expand") { + Some(s) => Some(ExpandFlags::from_str(&s)?), + None => None, + }; + + do_export( + num_workers, + expand, + value_t_or_exit!(m.value_of("entity_type"), ExportEntityType), + ) } + +quick_main!(run); |