aboutsummaryrefslogtreecommitdiffstats
path: root/rust/src
diff options
context:
space:
mode:
Diffstat (limited to 'rust/src')
-rw-r--r--rust/src/bin/fatcat-export.rs78
1 files changed, 50 insertions, 28 deletions
diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs
index fdb69e26..0d6d69b1 100644
--- a/rust/src/bin/fatcat-export.rs
+++ b/rust/src/bin/fatcat-export.rs
@@ -10,11 +10,11 @@ extern crate fatcat;
extern crate fatcat_api_spec;
#[macro_use]
extern crate log;
-extern crate env_logger;
-extern crate uuid;
extern crate crossbeam_channel;
-extern crate serde_json;
+extern crate env_logger;
extern crate num_cpus;
+extern crate serde_json;
+extern crate uuid;
use clap::{App, Arg};
use dotenv::dotenv;
@@ -22,13 +22,13 @@ use std::env;
use diesel::prelude::*;
use diesel::r2d2::ConnectionManager;
+use fatcat::api_entity_crud::*;
+use fatcat::api_helpers::*;
+use fatcat::errors::*;
+use fatcat::ConnectionPool;
use fatcat_api_spec::models::*;
use std::str::FromStr;
use uuid::Uuid;
-use fatcat::ConnectionPool;
-use fatcat::api_helpers::*;
-use fatcat::api_entity_crud::*;
-use fatcat::errors::*;
use error_chain::ChainedError;
use std::thread;
@@ -100,7 +100,10 @@ generic_loop_work!(loop_work_file, FileEntity);
generic_loop_work!(loop_work_release, ReleaseEntity);
generic_loop_work!(loop_work_work, WorkEntity);
-fn loop_printer(output_receiver: channel::Receiver<String>, done_sender: channel::Sender<()>) -> Result<()> {
+fn loop_printer(
+ output_receiver: channel::Receiver<String>,
+ done_sender: channel::Sender<()>,
+) -> Result<()> {
let output = std::io::stdout();
// XXX should log...
// let mut buf_output = BufWriter::new(output.lock());
@@ -134,9 +137,18 @@ fn parse_line(s: &str) -> Result<IdentRow> {
#[test]
fn test_parse_line() {
- assert!(parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t").is_ok());
- assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t").is_err());
- assert!(parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001").is_err());
+ assert!(
+ parse_line("00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t")
+ .is_ok()
+ );
+ assert!(
+ parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001\t")
+ .is_err()
+ );
+ assert!(
+ parse_line("00000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-fff000000001")
+ .is_err()
+ );
assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok());
}
@@ -151,8 +163,12 @@ fn test_parse_line() {
// 4. read rows, pushing to row channel
// => every N lines, log to stderr
// 5. wait for all channels to finish, and quit
-pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: ExportEntityType, redirects: bool) -> Result<()> {
-
+pub fn do_export(
+ num_workers: usize,
+ expand: Option<ExpandFlags>,
+ entity_type: ExportEntityType,
+ redirects: bool,
+) -> Result<()> {
let db_pool = database_worker_pool()?;
let buf_input = BufReader::new(std::io::stdin());
let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
@@ -168,21 +184,26 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E
let row_receiver = row_receiver.clone();
let output_sender = output_sender.clone();
match entity_type {
- ExportEntityType::Container =>
- thread::spawn(move || loop_work_container(row_receiver, output_sender, &db_conn, expand)),
- ExportEntityType::Creator =>
- thread::spawn(move || loop_work_creator(row_receiver, output_sender, &db_conn, expand)),
- ExportEntityType::File =>
- thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand)),
- ExportEntityType::Release =>
- thread::spawn(move || loop_work_release(row_receiver, output_sender, &db_conn, expand)),
- ExportEntityType::Work =>
- thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand)),
+ ExportEntityType::Container => thread::spawn(move || {
+ loop_work_container(row_receiver, output_sender, &db_conn, expand)
+ }),
+ ExportEntityType::Creator => thread::spawn(move || {
+ loop_work_creator(row_receiver, output_sender, &db_conn, expand)
+ }),
+ ExportEntityType::File => {
+ thread::spawn(move || loop_work_file(row_receiver, output_sender, &db_conn, expand))
+ }
+ ExportEntityType::Release => thread::spawn(move || {
+ loop_work_release(row_receiver, output_sender, &db_conn, expand)
+ }),
+ ExportEntityType::Work => {
+ thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand))
+ }
};
}
drop(output_sender);
// Start printer thread
- thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout") );
+ thread::spawn(move || loop_printer(output_receiver, done_sender).expect("printing to stdout"));
let mut count = 0;
for line in buf_input.lines() {
@@ -200,12 +221,14 @@ pub fn do_export(num_workers: usize, expand: Option<ExpandFlags>, entity_type: E
}
drop(row_sender);
done_receiver.recv();
- info!("Done reading ({} lines), waiting for workers to exit...", count);
+ info!(
+ "Done reading ({} lines), waiting for workers to exit...",
+ count
+ );
Ok(())
}
fn run() -> Result<()> {
-
let m = App::new("fatcat-export")
.version(env!("CARGO_PKG_VERSION"))
.author("Bryan Newbold <bnewbold@archive.org>")
@@ -236,8 +259,7 @@ fn run() -> Result<()> {
} else {
"info"
};
- let env = env_logger::Env::default()
- .filter_or(env_logger::DEFAULT_FILTER_ENV, log_level);
+ let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log_level);
env_logger::Builder::from_env(env).init();
do_export(