From 13e62f51b71473cb30ac4efbd5e1d178d30cde7c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 4 Aug 2020 18:02:39 -0700 Subject: group-by-work mode for fatcat-export --- rust/src/bin/fatcat-export.rs | 172 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 157 insertions(+), 15 deletions(-) (limited to 'rust') diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index f52c7d17..3cc0c7a8 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -36,7 +36,8 @@ arg_enum! { Fileset, Webcapture, Release, - Work + Work, + ReleaseByWork, } } @@ -44,6 +45,7 @@ struct IdentRow { ident_id: FatcatId, rev_id: Option, redirect_id: Option, + group_id: Option, } macro_rules! generic_loop_work { @@ -87,6 +89,39 @@ generic_loop_work!(loop_work_webcapture, WebcaptureEntity); generic_loop_work!(loop_work_release, ReleaseEntity); generic_loop_work!(loop_work_work, WorkEntity); +fn loop_work_release_by_work( + row_receiver: channel::Receiver>, + output_sender: channel::Sender>, + db_conn: &DbConn, + expand: Option, +) { + let result: Result<()> = (|| { + for row_batch in row_receiver { + let mut resp_batch: Vec = vec![]; + for row in row_batch { + let mut entity = ReleaseEntity::db_get_rev( + db_conn, + row.rev_id.expect("valid, non-deleted row"), + HideFlags::none(), + )?; // .chain_err(|| "reading entity from database")?; + entity.state = Some("active".to_string()); // only active lines were passed + entity.ident = Some(row.ident_id.to_string()); + if let Some(expand) = expand { + entity.db_expand(db_conn, expand)? + // chain_err(|| "expanding sub-entities from database")?; + } + resp_batch.push(serde_json::to_string(&entity)?); + } + output_sender.send(resp_batch); + } + Ok(()) + })(); + if let Err(ref e) = result { + error!("{}", e); // e.display_chain()) + } + result.unwrap() +} + fn loop_printer( output_receiver: channel::Receiver, done_sender: channel::Sender<()>, @@ -104,11 +139,37 @@ fn loop_printer( Ok(()) } +fn loop_batch_printer( + output_receiver: channel::Receiver>, + done_sender: channel::Sender<()>, +) -> Result<()> { + let output = std::io::stdout(); + // TODO: should log? + // let mut buf_output = BufWriter::new(output.lock()); + let mut buf_output = BufWriter::new(output); + for batch in output_receiver { + for line in batch { + buf_output.write_all(&line.into_bytes())?; + buf_output.write_all(b"\n")?; + buf_output.flush()?; + } + } + drop(done_sender); + Ok(()) +} + fn parse_line(s: &str) -> Result { let fields: Vec = s.split('\t').map(|v| v.to_string()).collect(); - if fields.len() != 3 { + let group_id: Option = if fields.len() == 4 { + match fields[3].as_ref() { + "" => None, + val => Some(FatcatId::from_uuid(&Uuid::from_str(&val)?)), + } + } else if fields.len() == 3 { + None + } else { bail!("Invalid input line"); - } + }; Ok(IdentRow { ident_id: FatcatId::from_uuid(&Uuid::from_str(&fields[0])?), rev_id: match fields[1].as_ref() { @@ -119,6 +180,7 @@ fn parse_line(s: &str) -> Result { "" => None, val => Some(FatcatId::from_uuid(&Uuid::from_str(&val)?)), }, + group_id, }) } @@ -137,6 +199,9 @@ fn test_parse_line() { .is_err() ); assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok()); + assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t").is_ok()); + assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t\t").is_err()); + assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-000000000002").is_ok()); } // Use num_cpus/2, or CLI arg for worker count @@ -192,6 +257,7 @@ pub fn do_export( ExportEntityType::Work => { thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)) } + ExportEntityType::ReleaseByWork => unimplemented!(), }; } drop(output_sender); @@ -221,22 +287,97 @@ pub fn do_export( Ok(()) } +pub fn do_export_batch( + num_workers: usize, + expand: Option, + entity_type: ExportEntityType, + redirects: bool, +) -> Result<()> { + let db_pool = server::database_worker_pool()?; + let buf_input = BufReader::new(std::io::stdin()); + let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); + let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); + let (done_sender, done_receiver) = channel::bounded(0); + + info!("Starting an export of {} entities", entity_type); + + // Start row worker threads + assert!(num_workers > 0); + for _ in 0..num_workers { + let db_conn = db_pool.get().expect("database pool"); + let row_receiver = row_receiver.clone(); + let output_sender = output_sender.clone(); + match entity_type { + ExportEntityType::ReleaseByWork => thread::spawn(move || { + loop_work_release_by_work(row_receiver, output_sender, &db_conn, expand) + }), + _ => unimplemented!(), + }; + } + drop(output_sender); + // Start printer thread + thread::spawn(move || { + loop_batch_printer(output_receiver, done_sender).expect("printing to stdout") + }); + + let mut count = 0; + let mut last_group_id: Option = None; + let mut batch = vec![]; + for line in buf_input.lines() { + let line = line?; + let row = parse_line(&line)?; + match (row.rev_id, row.redirect_id, redirects) { + (None, _, _) => (), + (Some(_), Some(_), false) => (), + _ => { + if row.group_id == None || row.group_id != last_group_id { + if batch.len() > 0 { + row_sender.send(batch); + batch = vec![]; + } + } + last_group_id = row.group_id; + batch.push(row); + } + } + count += 1; + if count % 1000 == 0 { + info!("processed {} lines...", count); + } + } + if batch.len() > 0 { + row_sender.send(batch); + } + drop(row_sender); + done_receiver.recv(); + info!( + "Done reading ({} lines), waiting for workers to exit...", + count + ); + Ok(()) +} + fn main() -> Result<()> { let m = App::new("fatcat-export") .version(env!("CARGO_PKG_VERSION")) .author("Bryan Newbold ") .about("Fast exports of database entities from an id list") - .arg(Arg::from_usage(" 'what entity type the idents correspond to'") - .possible_values(&ExportEntityType::variants()) - .case_insensitive(true)) + .arg( + Arg::from_usage(" 'what entity type the idents correspond to'") + .possible_values(&ExportEntityType::variants()) + .case_insensitive(true), + ) .args_from_usage( "-j --workers=[workers] 'number of threads (database connections) to use' -q --quiet 'less status output to stderr' --include-redirects 'include redirected idents (normally skipped)' - --expand=[expand] 'sub-entities to include in dump'") - .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \ + --expand=[expand] 'sub-entities to include in dump'", + ) + .after_help( + "Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \ and outputs JSON (one entity per line). Database connection info read from environment \ - (DATABASE_URL, same as fatcatd).") + (DATABASE_URL, same as fatcatd).", + ) .get_matches(); let num_workers: usize = match m.value_of("workers") { @@ -255,10 +396,11 @@ fn main() -> Result<()> { let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log_level); env_logger::Builder::from_env(env).init(); - do_export( - num_workers, - expand, - value_t_or_exit!(m.value_of("entity_type"), ExportEntityType), - m.is_present("include_redirects"), - ) + let entity_type = value_t_or_exit!(m.value_of("entity_type"), ExportEntityType); + let include_redirects = m.is_present("include_redirects"); + if entity_type == ExportEntityType::ReleaseByWork { + do_export_batch(num_workers, expand, entity_type, include_redirects) + } else { + do_export(num_workers, expand, entity_type, include_redirects) + } } -- cgit v1.2.3