summaryrefslogtreecommitdiffstats
path: root/rust/src/bin/fatcat-export.rs
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-09-11 19:04:49 -0700
committerBryan Newbold <bnewbold@robocracy.org>2018-09-11 19:04:49 -0700
commit459ca4e1aa4a22e4adf3c275a80368949dd52e8c (patch)
treea2586c1fb87723f758801919270964240ada4d78 /rust/src/bin/fatcat-export.rs
parentf5812c8c3b062b5efb34e45702ee7df507f71e16 (diff)
downloadfatcat-459ca4e1aa4a22e4adf3c275a80368949dd52e8c.tar.gz
fatcat-459ca4e1aa4a22e4adf3c275a80368949dd52e8c.zip
first pass fast export
Diffstat (limited to 'rust/src/bin/fatcat-export.rs')
-rw-r--r--rust/src/bin/fatcat-export.rs161
1 files changed, 161 insertions, 0 deletions
diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs
new file mode 100644
index 00000000..d7371c87
--- /dev/null
+++ b/rust/src/bin/fatcat-export.rs
@@ -0,0 +1,161 @@
+//! JSON Export Helper
+
+extern crate clap;
+extern crate diesel;
+extern crate dotenv;
+#[macro_use]
+extern crate error_chain;
+extern crate fatcat;
+extern crate fatcat_api_spec;
+#[macro_use]
+extern crate slog;
+extern crate slog_async;
+extern crate slog_term;
+extern crate uuid;
+extern crate crossbeam_channel;
+extern crate serde_json;
+
+use clap::{App, Arg};
+use slog::{Drain, Logger};
+use dotenv::dotenv;
+use std::env;
+
+use diesel::prelude::*;
+use diesel::r2d2::ConnectionManager;
+use fatcat_api_spec::models::*;
+use std::str::FromStr;
+use uuid::Uuid;
+use fatcat::ConnectionPool;
+use fatcat::api_helpers::*;
+use fatcat::api_entity_crud::*;
+use fatcat::errors::*;
+
+use std::thread;
+//use std::io::{Stdout,StdoutLock};
+use crossbeam_channel as channel;
+//use num_cpus; TODO:
+use std::io::prelude::*;
+use std::io::{BufReader, BufWriter};
+
+const CHANNEL_BUFFER_LEN: usize = 200;
+
+struct IdentRow {
+ ident_id: FatCatId,
+ rev_id: Option<Uuid>,
+ redirect_id: Option<FatCatId>,
+}
+
+/// Instantiate a new API server with a pooled database connection
+pub fn database_worker_pool() -> Result<ConnectionPool> {
+ dotenv().ok();
+ let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
+ let manager = ConnectionManager::<PgConnection>::new(database_url);
+ let pool = diesel::r2d2::Pool::builder()
+ .build(manager)
+ .expect("Failed to create database pool.");
+ Ok(pool)
+}
+
+fn loop_work(row_receiver: channel::Receiver<IdentRow>, output_sender: channel::Sender<String>, db_conn: &DbConn, expand: Option<ExpandFlags>) -> Result<()> {
+ for row in row_receiver {
+ // TODO: based on types
+ let mut entity = ReleaseEntity::db_get_rev(db_conn, row.rev_id.expect("valid, non-deleted row"))?;
+ entity.state = Some("active".to_string()); // TODO
+ entity.ident = Some(row.ident_id.to_string());
+ if let Some(expand) = expand {
+ entity.db_expand(db_conn, expand)?;
+ }
+ output_sender.send(serde_json::to_string(&entity)?);
+ }
+ Ok(())
+}
+
+fn loop_printer(output_receiver: channel::Receiver<String>, done_sender: channel::Sender<()>) -> Result<()> {
+ let output = std::io::stdout();
+ // XXX should log...
+ // let mut buf_output = BufWriter::new(output.lock());
+ let mut buf_output = BufWriter::new(output);
+ for line in output_receiver {
+ buf_output.write_all(&line.into_bytes())?;
+ buf_output.write(b"\n")?;
+ buf_output.flush()?;
+ }
+ drop(done_sender);
+ Ok(())
+}
+
+fn parse_line(s: String) -> Result<IdentRow> {
+ let fields: Vec<String> = s.split("\t").map(|v| v.to_string()).collect();
+ if fields.len() != 3 {
+ bail!("Invalid input line");
+ }
+ Ok(IdentRow {
+ ident_id: FatCatId::from_uuid(&Uuid::from_str(&fields[0])?),
+ rev_id: Some(Uuid::from_str(&fields[1])?),
+ redirect_id: None,
+ })
+}
+
+#[test]
+fn test_parse_line() {
+ assert!(false)
+}
+
+// Use num_cpus/2, or CLI arg for worker count
+//
+// 1. open buffered reader, buffered writer, and database pool. create channels
+// 2. spawn workers:
+// - get a database connection from database pool
+// - iterate over row channel, pushing Strings to output channel
+// - exit when end of rows
+// 3. spawn output printer
+// 4. read rows, pushing to row channel
+// => every N lines, log to stderr
+// 5. wait for all channels to finish, and quit
+pub fn run() -> Result<()> {
+
+ let db_pool = database_worker_pool()?;
+ let buf_input = BufReader::new(std::io::stdin());
+ let worker_count = 4;
+ let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
+ let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
+ let (done_sender, done_receiver) = channel::bounded(0);
+
+ // Start row worker threads
+ for _ in 0..worker_count {
+ let db_conn = db_pool.get().expect("database pool");
+ let row_receiver = row_receiver.clone();
+ let output_sender = output_sender.clone();
+ thread::spawn(move || loop_work(row_receiver, output_sender, &db_conn, None));
+ }
+ drop(output_sender);
+ // Start printer thread
+ thread::spawn(move || loop_printer(output_receiver, done_sender));
+
+ for line in buf_input.lines() {
+ let line = line?;
+ let row = parse_line(line)?;
+ row_sender.send(row);
+ }
+ drop(row_sender);
+ done_receiver.recv();
+ Ok(())
+}
+
+fn main() {
+ /*
+ let matches = App::new("server")
+ .arg(
+ Arg::with_name("https")
+ .long("https")
+ .help("Whether to use HTTPS or not"),
+ )
+ .get_matches();
+
+ let decorator = slog_term::TermDecorator::new().build();
+ let drain = slog_term::CompactFormat::new(decorator).build().fuse();
+ let drain = slog_async::Async::new(drain).build().fuse();
+ let logger = Logger::root(drain, o!());
+ */
+ run().expect("success")
+}