summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbnewbold <bnewbold@archive.org>2020-08-05 18:12:49 +0000
committerbnewbold <bnewbold@archive.org>2020-08-05 18:12:49 +0000
commit4f80b87722d64f27c985f0040ea177269b6e028b (patch)
treeb1893d8ff74fbd61f97df4f41e7c7ba2c09c67a7
parent59b772fa9af05b35ce14d26bcabb66cc124255d4 (diff)
parentb2b830164defc13cb498ba64b1529b4b7f5f1da5 (diff)
downloadfatcat-4f80b87722d64f27c985f0040ea177269b6e028b.tar.gz
fatcat-4f80b87722d64f27c985f0040ea177269b6e028b.zip
Merge branch 'bnewbold-work-dumps' into 'master'
release dumps grouped by work_id See merge request webgroup/fatcat!75
-rw-r--r--extra/sql_dumps/README.md2
-rw-r--r--extra/sql_dumps/dump_idents.sql16
-rwxr-xr-xextra/sql_dumps/ident_table_snapshot.sh3
-rw-r--r--proposals/20200804_grouped_release_exports.md60
-rw-r--r--rust/Makefile3
-rw-r--r--rust/src/bin/fatcat-export.rs172
6 files changed, 237 insertions, 19 deletions
diff --git a/extra/sql_dumps/README.md b/extra/sql_dumps/README.md
index f24d3d92..1fa37981 100644
--- a/extra/sql_dumps/README.md
+++ b/extra/sql_dumps/README.md
@@ -30,7 +30,7 @@ Dump locally to stdout, eg:
Or, in production:
# production, as 'fatcat' user, in /srv/fatcat/src/rust:
- cat /tmp/fatcat_ident_releases.tsv | ./target/release/fatcat-export release --expand files,filesets,webcaptures,container -j8 | pigz > /srv/fatcat/snapshots/release_export_expanded.json.gz
+ cat /tmp/fatcat_ident_releases_by_work.tsv | ./target/release/fatcat-export releasebywork --expand files,filesets,webcaptures,container -j8 | pigz > /srv/fatcat/snapshots/release_export_expanded.json.gz
cat /tmp/fatcat_ident_creators.tsv | ./target/release/fatcat-export creator -j8 | pigz > /srv/fatcat/snapshots/creator_export.json.gz
cat /tmp/fatcat_ident_containers.tsv | ./target/release/fatcat-export container -j8 | pigz > /srv/fatcat/snapshots/container_export.json.gz
cat /tmp/fatcat_ident_files.tsv | ./target/release/fatcat-export file -j8 | pigz > /srv/fatcat/snapshots/file_export.json.gz
diff --git a/extra/sql_dumps/dump_idents.sql b/extra/sql_dumps/dump_idents.sql
index e8126347..d9777ea1 100644
--- a/extra/sql_dumps/dump_idents.sql
+++ b/extra/sql_dumps/dump_idents.sql
@@ -10,5 +10,21 @@ COPY (SELECT id, rev_id, redirect_id FROM webcapture_ident WHERE is_live=true)
COPY (SELECT id, rev_id, redirect_id FROM release_ident WHERE is_live=true) TO '/tmp/fatcat_ident_releases.tsv' WITH NULL '';
COPY (SELECT id, rev_id, redirect_id FROM work_ident WHERE is_live=true) TO '/tmp/fatcat_ident_works.tsv' WITH NULL '';
COPY (SELECT id, editgroup_id, timestamp FROM changelog) TO '/tmp/fatcat_ident_changelog.tsv' WITH NULL '';
+COPY (
+ SELECT
+ release_ident.id,
+ release_ident.rev_id,
+ release_ident.redirect_id,
+ release_rev.work_ident_id
+ FROM
+ release_ident
+ LEFT JOIN release_rev ON release_ident.rev_id = release_rev.id
+ WHERE
+ release_ident.is_live=true
+ AND release_ident.redirect_id IS NULL
+ AND release_ident.rev_id IS NOT NULL
+ ORDER BY
+ release_rev.work_ident_id ASC NULLS LAST
+) TO '/tmp/fatcat_ident_releases_by_work.tsv' WITH NULL '';
ROLLBACK;
diff --git a/extra/sql_dumps/ident_table_snapshot.sh b/extra/sql_dumps/ident_table_snapshot.sh
index dbd4caf0..b287a0ce 100755
--- a/extra/sql_dumps/ident_table_snapshot.sh
+++ b/extra/sql_dumps/ident_table_snapshot.sh
@@ -32,6 +32,7 @@ tar -C /tmp -c --verbose \
fatcat_ident_filesets.tsv \
fatcat_ident_webcaptures.tsv \
fatcat_ident_releases.tsv \
- fatcat_ident_works.tsv
+ fatcat_ident_works.tsv \
+ fatcat_ident_releases_by_work.tsv
echo "Done: $OUTFILE"
diff --git a/proposals/20200804_grouped_release_exports.md b/proposals/20200804_grouped_release_exports.md
new file mode 100644
index 00000000..d75ba687
--- /dev/null
+++ b/proposals/20200804_grouped_release_exports.md
@@ -0,0 +1,60 @@
+
+Grouped Release Exports
+=======================
+
+Want to have rich "work" entity dumps, without duplicating with enriched
+release dumps. The motivation for this is to do work-level processing of
+releases. For example, transform into scholar.archive.org (indexed at work
+level), bulk downloading fulltext (only need "best copy for entire work"),
+citation analysis, etc.
+
+One elegant way to do this would be having release exports be sorted by
+`work_id`, with all release entities with the same `work_id` contiguous (but
+not all on the same line).
+
+Plan is to update fatcat-export tool (rust) to read in an ident list with
+`work_id` included (and sorted by the `work_id`).
+
+## Rust Implementation
+
+Remembering that `fatcat-export` operates on lines, and uses channels and
+worker threads for processing and serialization, we can't just assume that the
+output lines will be in the same order as the input lines (eg, worker threads
+can shift order). To preserve order, we will add a new dump mode which operates
+on batches (`Vec`) of rows instead, and write the batches contiguously. The
+batches themselves may end up out of order, but that is fine.
+
+## SQL Ident Dump
+
+
+Database timing (running 2020-08-04 on fatcat-qa):
+
+ COPY (SELECT id, rev_id, redirect_id FROM release_ident WHERE is_live=true) TO '/tmp/fatcat_ident_releases.tsv' WITH NULL '';
+ => COPY 142635153
+ => Time: 143264.483 ms (02:23.264)
+
+ COPY (
+ SELECT
+ release_ident.id,
+ release_ident.rev_id,
+ release_ident.redirect_id,
+ release_rev.work_ident_id
+ FROM
+ release_ident
+ LEFT JOIN release_rev ON release_ident.rev_id = release_rev.id
+ WHERE
+ release_ident.is_live=true
+ AND release_ident.redirect_id IS NULL
+ AND release_ident.rev_id IS NOT NULL
+ ORDER BY
+ release_rev.work_ident_id ASC NULLS LAST
+ ) TO '/tmp/fatcat_ident_releases_by_work.tsv' WITH NULL '';
+ => COPY 142635147
+ => Time: 610540.112 ms (10:10.540)
+
+Much slower, but not a blocking issue. Apparently postgresql will do a full
+rowscan instead of using the existing `work_id` sorted index on `release_rev`
+when going of a large fraction of the table (here almost the entire table)
+because it thinks the read will be so much faster. I don't think this is true
+with SSDs? Haven't actually confirmed this is the behavior, just assuming based
+on postgresql docs.
diff --git a/rust/Makefile b/rust/Makefile
index 81fb32af..3bfc1b49 100644
--- a/rust/Makefile
+++ b/rust/Makefile
@@ -10,8 +10,7 @@ help: ## Print info about all commands
.PHONY: test
test: ## Run all tests and lints
- cargo test --
- pipenv run mypy fatcat_scholar/*.py tests/ --ignore-missing-imports
+ cargo test -- --test-threads 1
.PHONY: dev
dev: ## Run web service locally, with reloading
diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs
index f52c7d17..7d671b9a 100644
--- a/rust/src/bin/fatcat-export.rs
+++ b/rust/src/bin/fatcat-export.rs
@@ -36,7 +36,8 @@ arg_enum! {
Fileset,
Webcapture,
Release,
- Work
+ Work,
+ ReleaseByWork,
}
}
@@ -44,6 +45,7 @@ struct IdentRow {
ident_id: FatcatId,
rev_id: Option<Uuid>,
redirect_id: Option<FatcatId>,
+ group_id: Option<FatcatId>,
}
macro_rules! generic_loop_work {
@@ -87,6 +89,39 @@ generic_loop_work!(loop_work_webcapture, WebcaptureEntity);
generic_loop_work!(loop_work_release, ReleaseEntity);
generic_loop_work!(loop_work_work, WorkEntity);
+fn loop_work_release_by_work(
+ row_receiver: channel::Receiver<Vec<IdentRow>>,
+ output_sender: channel::Sender<Vec<String>>,
+ db_conn: &DbConn,
+ expand: Option<ExpandFlags>,
+) {
+ let result: Result<()> = (|| {
+ for row_batch in row_receiver {
+ let mut resp_batch: Vec<String> = vec![];
+ for row in row_batch {
+ let mut entity = ReleaseEntity::db_get_rev(
+ db_conn,
+ row.rev_id.expect("valid, non-deleted row"),
+ HideFlags::none(),
+ )?; // .chain_err(|| "reading entity from database")?;
+ entity.state = Some("active".to_string()); // only active lines were passed
+ entity.ident = Some(row.ident_id.to_string());
+ if let Some(expand) = expand {
+ entity.db_expand(db_conn, expand)?
+ // chain_err(|| "expanding sub-entities from database")?;
+ }
+ resp_batch.push(serde_json::to_string(&entity)?);
+ }
+ output_sender.send(resp_batch);
+ }
+ Ok(())
+ })();
+ if let Err(ref e) = result {
+ error!("{}", e); // e.display_chain())
+ }
+ result.unwrap()
+}
+
fn loop_printer(
output_receiver: channel::Receiver<String>,
done_sender: channel::Sender<()>,
@@ -104,11 +139,37 @@ fn loop_printer(
Ok(())
}
+fn loop_batch_printer(
+ output_receiver: channel::Receiver<Vec<String>>,
+ done_sender: channel::Sender<()>,
+) -> Result<()> {
+ let output = std::io::stdout();
+ // TODO: should log?
+ // let mut buf_output = BufWriter::new(output.lock());
+ let mut buf_output = BufWriter::new(output);
+ for batch in output_receiver {
+ for line in batch {
+ buf_output.write_all(&line.into_bytes())?;
+ buf_output.write_all(b"\n")?;
+ }
+ buf_output.flush()?;
+ }
+ drop(done_sender);
+ Ok(())
+}
+
fn parse_line(s: &str) -> Result<IdentRow> {
let fields: Vec<String> = s.split('\t').map(|v| v.to_string()).collect();
- if fields.len() != 3 {
+ let group_id: Option<FatcatId> = if fields.len() == 4 {
+ match fields[3].as_ref() {
+ "" => None,
+ val => Some(FatcatId::from_uuid(&Uuid::from_str(&val)?)),
+ }
+ } else if fields.len() == 3 {
+ None
+ } else {
bail!("Invalid input line");
- }
+ };
Ok(IdentRow {
ident_id: FatcatId::from_uuid(&Uuid::from_str(&fields[0])?),
rev_id: match fields[1].as_ref() {
@@ -119,6 +180,7 @@ fn parse_line(s: &str) -> Result<IdentRow> {
"" => None,
val => Some(FatcatId::from_uuid(&Uuid::from_str(&val)?)),
},
+ group_id,
})
}
@@ -137,6 +199,9 @@ fn test_parse_line() {
.is_err()
);
assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok());
+ assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t").is_ok());
+ assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t\t").is_err());
+ assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-000000000002").is_ok());
}
// Use num_cpus/2, or CLI arg for worker count
@@ -192,6 +257,7 @@ pub fn do_export(
ExportEntityType::Work => {
thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand))
}
+ ExportEntityType::ReleaseByWork => unimplemented!(),
};
}
drop(output_sender);
@@ -221,22 +287,97 @@ pub fn do_export(
Ok(())
}
+pub fn do_export_batch(
+ num_workers: usize,
+ expand: Option<ExpandFlags>,
+ entity_type: ExportEntityType,
+ redirects: bool,
+) -> Result<()> {
+ let db_pool = server::database_worker_pool()?;
+ let buf_input = BufReader::new(std::io::stdin());
+ let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
+ let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN);
+ let (done_sender, done_receiver) = channel::bounded(0);
+
+ info!("Starting an export of {} entities", entity_type);
+
+ // Start row worker threads
+ assert!(num_workers > 0);
+ for _ in 0..num_workers {
+ let db_conn = db_pool.get().expect("database pool");
+ let row_receiver = row_receiver.clone();
+ let output_sender = output_sender.clone();
+ match entity_type {
+ ExportEntityType::ReleaseByWork => thread::spawn(move || {
+ loop_work_release_by_work(row_receiver, output_sender, &db_conn, expand)
+ }),
+ _ => unimplemented!(),
+ };
+ }
+ drop(output_sender);
+ // Start printer thread
+ thread::spawn(move || {
+ loop_batch_printer(output_receiver, done_sender).expect("printing to stdout")
+ });
+
+ let mut count = 0;
+ let mut last_group_id: Option<FatcatId> = None;
+ let mut batch = vec![];
+ for line in buf_input.lines() {
+ let line = line?;
+ let row = parse_line(&line)?;
+ match (row.rev_id, row.redirect_id, redirects) {
+ (None, _, _) => (),
+ (Some(_), Some(_), false) => (),
+ _ => {
+ if row.group_id == None || row.group_id != last_group_id {
+ if batch.len() > 0 {
+ row_sender.send(batch);
+ batch = vec![];
+ }
+ }
+ last_group_id = row.group_id;
+ batch.push(row);
+ }
+ }
+ count += 1;
+ if count % 1000 == 0 {
+ info!("processed {} lines...", count);
+ }
+ }
+ if batch.len() > 0 {
+ row_sender.send(batch);
+ }
+ drop(row_sender);
+ done_receiver.recv();
+ info!(
+ "Done reading ({} lines), waiting for workers to exit...",
+ count
+ );
+ Ok(())
+}
+
fn main() -> Result<()> {
let m = App::new("fatcat-export")
.version(env!("CARGO_PKG_VERSION"))
.author("Bryan Newbold <bnewbold@archive.org>")
.about("Fast exports of database entities from an id list")
- .arg(Arg::from_usage("<entity_type> 'what entity type the idents correspond to'")
- .possible_values(&ExportEntityType::variants())
- .case_insensitive(true))
+ .arg(
+ Arg::from_usage("<entity_type> 'what entity type the idents correspond to'")
+ .possible_values(&ExportEntityType::variants())
+ .case_insensitive(true),
+ )
.args_from_usage(
"-j --workers=[workers] 'number of threads (database connections) to use'
-q --quiet 'less status output to stderr'
--include-redirects 'include redirected idents (normally skipped)'
- --expand=[expand] 'sub-entities to include in dump'")
- .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \
+ --expand=[expand] 'sub-entities to include in dump'",
+ )
+ .after_help(
+ "Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \
and outputs JSON (one entity per line). Database connection info read from environment \
- (DATABASE_URL, same as fatcatd).")
+ (DATABASE_URL, same as fatcatd).",
+ )
.get_matches();
let num_workers: usize = match m.value_of("workers") {
@@ -255,10 +396,11 @@ fn main() -> Result<()> {
let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log_level);
env_logger::Builder::from_env(env).init();
- do_export(
- num_workers,
- expand,
- value_t_or_exit!(m.value_of("entity_type"), ExportEntityType),
- m.is_present("include_redirects"),
- )
+ let entity_type = value_t_or_exit!(m.value_of("entity_type"), ExportEntityType);
+ let include_redirects = m.is_present("include_redirects");
+ if entity_type == ExportEntityType::ReleaseByWork {
+ do_export_batch(num_workers, expand, entity_type, include_redirects)
+ } else {
+ do_export(num_workers, expand, entity_type, include_redirects)
+ }
}