aboutsummaryrefslogtreecommitdiffstats
path: root/rust/src/bin/fatcat-export.rs
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-09-11 22:09:58 -0700
committerBryan Newbold <bnewbold@robocracy.org>2018-09-11 22:14:36 -0700
commitf7f18bb588023e869c347ddfd23bf1ed5d16e527 (patch)
tree98f39ef82561ffa74dfe64740391e595995df89c /rust/src/bin/fatcat-export.rs
parent538cd2ec602a044dc36ca81d9a2a07863080c764 (diff)
downloadfatcat-f7f18bb588023e869c347ddfd23bf1ed5d16e527.tar.gz
fatcat-f7f18bb588023e869c347ddfd23bf1ed5d16e527.zip
improvements to fatcat-export output
Diffstat (limited to 'rust/src/bin/fatcat-export.rs')
-rw-r--r--rust/src/bin/fatcat-export.rs89
1 files changed, 65 insertions, 24 deletions
diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs
index 6f9b92cf..1af321ba 100644
--- a/rust/src/bin/fatcat-export.rs
+++ b/rust/src/bin/fatcat-export.rs
@@ -9,16 +9,14 @@ extern crate error_chain;
extern crate fatcat;
extern crate fatcat_api_spec;
#[macro_use]
-extern crate slog;
-extern crate slog_async;
-extern crate slog_term;
+extern crate log;
+extern crate env_logger;
extern crate uuid;
extern crate crossbeam_channel;
extern crate serde_json;
extern crate num_cpus;
use clap::{App, Arg};
-use slog::{Drain, Logger};
use dotenv::dotenv;
use std::env;
@@ -32,6 +30,7 @@ use fatcat::api_helpers::*;
use fatcat::api_entity_crud::*;
use fatcat::errors::*;
+use error_chain::ChainedError;
use std::thread;
//use std::io::{Stdout,StdoutLock};
use crossbeam_channel as channel;
@@ -71,18 +70,26 @@ pub fn database_worker_pool() -> Result<ConnectionPool> {
macro_rules! generic_loop_work {
($fn_name:ident, $entity_model:ident) => {
- fn $fn_name(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> {
- for row in row_receiver {
- let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?;
- //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?;
- entity.state = Some("active".to_string()); // XXX
- entity.ident = Some(row.ident_id.to_string());
- if let Some(expand) = expand {
- entity.db_expand(db_conn, expand)?;
+ fn $fn_name(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) {
+ let result: Result<()> = (|| {
+ for row in row_receiver {
+ let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))
+ .chain_err(|| "reading entity from database")?;
+ //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?;
+ entity.state = Some("active".to_string()); // XXX
+ entity.ident = Some(row.ident_id.to_string());
+ if let Some(expand) = expand {
+ entity.db_expand(db_conn, expand)
+ .chain_err(|| "expanding sub-entities from database")?;
+ }
+ output_sender.send(serde_json::to_string(&entity)?);
}
- output_sender.send(serde_json::to_string(&entity)?);
+ Ok(())
+ })();
+ if let Err(ref e) = result {
+ error!("{}", e.display_chain())
}
- Ok(())
+ result.unwrap()
}
}
}
@@ -114,14 +121,23 @@ fn parse_line(s: String) -> Result<IdentRow> {
}
Ok(IdentRow {
ident_id: FatCatId::from_uuid(&Uuid::from_str(&fields[0])?),
- rev_id: Some(Uuid::from_str(&fields[1])?),
- redirect_id: None,
+ rev_id: match fields[1].as_ref() {
+ "\\N" => None,
+ val => Some(Uuid::from_str(&val)?),
+ },
+ redirect_id: match fields[2].as_ref() {
+ "\\N" => None,
+ val => Some(FatCatId::from_uuid(&Uuid::from_str(&val)?)),
+ },
})
}
#[test]
fn test_parse_line() {
- assert!(false)
+ assert!(parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t\\N").is_ok());
+ assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t\\N").is_err());
+ assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001").is_err());
+ assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok());
}
// Use num_cpus/2, or CLI arg for worker count
@@ -135,7 +151,7 @@ fn test_parse_line() {
// 4. read rows, pushing to row channel
// => every N lines, log to stderr
// 5. wait for all channels to finish, and quit
-pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType) -> Result<()> {
+pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType, redirects: bool) -> Result<()> {
let db_pool = database_worker_pool()?;
let buf_input = BufReader::new(std::io::stdin());
@@ -143,6 +159,8 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E
let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
let (done_sender, done_receiver) = channel::bounded(0);
+ info!("Starting an export of {} entities", entity_type);
+
// Start row worker threads
assert!(num_workers > 0);
for _ in 0..num_workers {
@@ -164,15 +182,25 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E
}
drop(output_sender);
// Start printer thread
- thread::spawn(move || loop_printer(output_receiver, done_sender));
+ thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout") );
+ let mut count = 0;
for line in buf_input.lines() {
let line = line?;
let row = parse_line(line)?;
- row_sender.send(row);
+ match (row.rev_id, row.redirect_id, redirects) {
+ (None, _, _) => (),
+ (Some(_), Some(_), false) => (),
+ _ => row_sender.send(row),
+ }
+ count += 1;
+ if count % 1000 == 0 {
+ info!("processed {} lines...", count);
+ }
}
drop(row_sender);
done_receiver.recv();
+ info!("Done reading ({} lines), waiting for workers to exit...", count);
Ok(())
}
@@ -182,28 +210,41 @@ fn run() -> Result<()> {
.version(env!("CARGO_PKG_VERSION"))
.author("Bryan Newbold <bnewbold@archive.org>")
.about("Fast exports of database entities from an id list")
+ .arg(Arg::from_usage("<entity_type> 'what entity type the idents correspond to'")
+ .possible_values(&ExportEntityType::variants())
+ .case_insensitive(true))
.args_from_usage(
- "-f --workers=[workers] 'number of threads (database connections) to use'
- --expand=[expand] 'sub-entities to include in dump'
- <entity_type> 'what entity type the idents correspond to'")
+ "-j --workers=[workers] 'number of threads (database connections) to use'
+ -q --quiet 'less status output to stderr'
+ --include-redirects 'include redirected idents (normally skipped)'
+ --expand=[expand] 'sub-entities to include in dump'")
.after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \
and outputs JSON (one entity per line). Database connection info read from environment \
(DATABASE_URL, same as fatcatd).")
.get_matches();
let num_workers: usize = match m.value_of("workers") {
- Some(v) => value_t_or_exit!(m.value_of("workers"), usize),
+ Some(_) => value_t_or_exit!(m.value_of("workers"), usize),
None => std::cmp::min(1, num_cpus::get() / 2) as usize,
};
let expand = match m.value_of("expand") {
Some(s) => Some(ExpandFlags::from_str(&s)?),
None => None,
};
+ let log_level = if m.is_present("quiet") {
+ "warn"
+ } else {
+ "info"
+ };
+ let env = env_logger::Env::default()
+ .filter_or(env_logger::DEFAULT_FILTER_ENV, log_level);
+ env_logger::Builder::from_env(env).init();
do_export(
num_workers,
expand,
value_t_or_exit!(m.value_of("entity_type"), ExportEntityType),
+ m.is_present("include_redirects"),
)
}