aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rust/Cargo.lock1
-rw-r--r--rust/Cargo.toml3
-rw-r--r--rust/src/bin/fatcat-export.rs105
3 files changed, 80 insertions, 29 deletions
diff --git a/rust/Cargo.lock b/rust/Cargo.lock
index 6d12659e..2a7d6017 100644
--- a/rust/Cargo.lock
+++ b/rust/Cargo.lock
@@ -365,6 +365,7 @@ dependencies = [
"iron-test 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
"sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 0e14c583..7c6695d5 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -13,7 +13,7 @@ fatcat-api-spec = {version = "*", path = "fatcat-api-spec"}
diesel = { version = "1.3", features = ["postgres", "uuid", "serde_json", "chrono", "r2d2"] }
diesel_migrations = "1.3"
dotenv = "0.9.0"
-clap = "*"
+clap = "2"
error-chain = "0.12"
uuid = "0.5"
log = "*"
@@ -37,6 +37,7 @@ serde_json = "1.0"
# Command-line tools
crossbeam-channel = "0.2"
+num_cpus = "1"
# Unused (hyper 0.11 and https)
#hyper-openssl = {version = "0.2", optional = true}
diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs
index d7371c87..6f9b92cf 100644
--- a/rust/src/bin/fatcat-export.rs
+++ b/rust/src/bin/fatcat-export.rs
@@ -1,5 +1,6 @@
//! JSON Export Helper
+#[macro_use]
extern crate clap;
extern crate diesel;
extern crate dotenv;
@@ -14,6 +15,7 @@ extern crate slog_term;
extern crate uuid;
extern crate crossbeam_channel;
extern crate serde_json;
+extern crate num_cpus;
use clap::{App, Arg};
use slog::{Drain, Logger};
@@ -39,6 +41,17 @@ use std::io::{BufReader, BufWriter};
const CHANNEL_BUFFER_LEN: usize = 200;
+arg_enum!{
+ #[derive(PartialEq, Debug, Clone, Copy)]
+ pub enum ExportEntityType {
+ Creator,
+ Container,
+ File,
+ Release,
+ Work
+ }
+}
+
struct IdentRow {
ident_id: FatCatId,
rev_id: Option<Uuid>,
@@ -56,20 +69,30 @@ pub fn database_worker_pool() -> Result<ConnectionPool> {
Ok(pool)
}
-fn loop_work(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> {
- for row in row_receiver {
- // TODO: based on types
- let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?;
- entity.state = Some("active".to_string()); // TODO
- entity.ident = Some(row.ident_id.to_string());
- if let Some(expand) = expand {
- entity.db_expand(db_conn, expand)?;
+macro_rules! generic_loop_work {
+ ($fn_name:ident, $entity_model:ident) => {
+ fn $fn_name(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> {
+ for row in row_receiver {
+ let mut entity = $entity_model::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?;
+ //let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?;
+ entity.state = Some("active".to_string()); // XXX
+ entity.ident = Some(row.ident_id.to_string());
+ if let Some(expand) = expand {
+ entity.db_expand(db_conn, expand)?;
+ }
+ output_sender.send(serde_json::to_string(&entity)?);
+ }
+ Ok(())
}
- output_sender.send(serde_json::to_string(&entity)?);
}
- Ok(())
}
+generic_loop_work!(loop_work_container, ContainerEntity);
+generic_loop_work!(loop_work_creator, CreatorEntity);
+generic_loop_work!(loop_work_file, FileEntity);
+generic_loop_work!(loop_work_release, ReleaseEntity);
+generic_loop_work!(loop_work_work, WorkEntity);
+
fn loop_printer(output_receiver: channel::Receiver<String>, done_sender: channel::Sender<()>) -> Result<()> {
let output = std::io::stdout();
// XXX should log...
@@ -112,21 +135,32 @@ fn test_parse_line() {
// 4. read rows, pushing to row channel
// => every N lines, log to stderr
// 5. wait for all channels to finish, and quit
-pub fn run() -> Result<()> {
+pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType) -> Result<()> {
let db_pool = database_worker_pool()?;
let buf_input = BufReader::new(std::io::stdin());
- let worker_count = 4;
let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
let (done_sender, done_receiver) = channel::bounded(0);
// Start row worker threads
- for _ in 0..worker_count {
+ assert!(num_workers > 0);
+ for _ in 0..num_workers {
let db_conn = db_pool.get().expect("database pool");
let row_receiver = row_receiver.clone();
let output_sender = output_sender.clone();
- thread::spawn(move || loop_work(row_receiver, output_sender, &db_conn, None));
+ match entity_type {
+ ExportEntityType::Container =>
+ thread::spawn(move || loop_work_container(row_receiver, output_sender, &db_conn, expand)),
+ ExportEntityType::Creator =>
+ thread::spawn(move || loop_work_creator(row_receiver, output_sender, &db_conn, expand)),
+ ExportEntityType::File =>
+ thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)),
+ ExportEntityType::Release =>
+ thread::spawn(move || loop_work_release(row_receiver, output_sender, &db_conn, expand)),
+ ExportEntityType::Work =>
+ thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)),
+ };
}
drop(output_sender);
// Start printer thread
@@ -142,20 +176,35 @@ pub fn run() -> Result<()> {
Ok(())
}
-fn main() {
- /*
- let matches = App::new("server")
- .arg(
- Arg::with_name("https")
- .long("https")
- .help("Whether to use HTTPS or not"),
- )
+fn run() -> Result<()> {
+
+ let m = App::new("fatcat-export")
+ .version(env!("CARGO_PKG_VERSION"))
+ .author("Bryan Newbold <bnewbold@archive.org>")
+ .about("Fast exports of database entities from an id list")
+ .args_from_usage(
+ "-f --workers=[workers] 'number of threads (database connections) to use'
+ --expand=[expand] 'sub-entities to include in dump'
+ <entity_type> 'what entity type the idents correspond to'")
+ .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \
+ and outputs JSON (one entity per line). Database connection info read from environment \
+ (DATABASE_URL, same as fatcatd).")
.get_matches();
- let decorator = slog_term::TermDecorator::new().build();
- let drain = slog_term::CompactFormat::new(decorator).build().fuse();
- let drain = slog_async::Async::new(drain).build().fuse();
- let logger = Logger::root(drain, o!());
- */
- run().expect("success")
+ let num_workers: usize = match m.value_of("workers") {
+ Some(v) => value_t_or_exit!(m.value_of("workers"), usize),
+ None => std::cmp::min(1, num_cpus::get() / 2) as usize,
+ };
+ let expand = match m.value_of("expand") {
+ Some(s) => Some(ExpandFlags::from_str(&s)?),
+ None => None,
+ };
+
+ do_export(
+ num_workers,
+ expand,
+ value_t_or_exit!(m.value_of("entity_type"), ExportEntityType),
+ )
}
+
+quick_main!(run);