diff options
| author | bnewbold <bnewbold@archive.org> | 2020-08-05 18:12:49 +0000 | 
|---|---|---|
| committer | bnewbold <bnewbold@archive.org> | 2020-08-05 18:12:49 +0000 | 
| commit | 4f80b87722d64f27c985f0040ea177269b6e028b (patch) | |
| tree | b1893d8ff74fbd61f97df4f41e7c7ba2c09c67a7 | |
| parent | 59b772fa9af05b35ce14d26bcabb66cc124255d4 (diff) | |
| parent | b2b830164defc13cb498ba64b1529b4b7f5f1da5 (diff) | |
| download | fatcat-4f80b87722d64f27c985f0040ea177269b6e028b.tar.gz fatcat-4f80b87722d64f27c985f0040ea177269b6e028b.zip  | |
Merge branch 'bnewbold-work-dumps' into 'master'
release dumps grouped by work_id
See merge request webgroup/fatcat!75
| -rw-r--r-- | extra/sql_dumps/README.md | 2 | ||||
| -rw-r--r-- | extra/sql_dumps/dump_idents.sql | 16 | ||||
| -rwxr-xr-x | extra/sql_dumps/ident_table_snapshot.sh | 3 | ||||
| -rw-r--r-- | proposals/20200804_grouped_release_exports.md | 60 | ||||
| -rw-r--r-- | rust/Makefile | 3 | ||||
| -rw-r--r-- | rust/src/bin/fatcat-export.rs | 172 | 
6 files changed, 237 insertions, 19 deletions
diff --git a/extra/sql_dumps/README.md b/extra/sql_dumps/README.md index f24d3d92..1fa37981 100644 --- a/extra/sql_dumps/README.md +++ b/extra/sql_dumps/README.md @@ -30,7 +30,7 @@ Dump locally to stdout, eg:  Or, in production:      # production, as 'fatcat' user, in /srv/fatcat/src/rust: -    cat /tmp/fatcat_ident_releases.tsv | ./target/release/fatcat-export release --expand files,filesets,webcaptures,container -j8 | pigz > /srv/fatcat/snapshots/release_export_expanded.json.gz +    cat /tmp/fatcat_ident_releases_by_work.tsv | ./target/release/fatcat-export releasebywork --expand files,filesets,webcaptures,container -j8 | pigz > /srv/fatcat/snapshots/release_export_expanded.json.gz      cat /tmp/fatcat_ident_creators.tsv | ./target/release/fatcat-export creator -j8 | pigz > /srv/fatcat/snapshots/creator_export.json.gz      cat /tmp/fatcat_ident_containers.tsv | ./target/release/fatcat-export container -j8 | pigz > /srv/fatcat/snapshots/container_export.json.gz      cat /tmp/fatcat_ident_files.tsv | ./target/release/fatcat-export file -j8 | pigz > /srv/fatcat/snapshots/file_export.json.gz diff --git a/extra/sql_dumps/dump_idents.sql b/extra/sql_dumps/dump_idents.sql index e8126347..d9777ea1 100644 --- a/extra/sql_dumps/dump_idents.sql +++ b/extra/sql_dumps/dump_idents.sql @@ -10,5 +10,21 @@ COPY (SELECT id, rev_id, redirect_id FROM webcapture_ident   WHERE is_live=true)  COPY (SELECT id, rev_id, redirect_id FROM release_ident      WHERE is_live=true) TO '/tmp/fatcat_ident_releases.tsv'            WITH NULL '';  COPY (SELECT id, rev_id, redirect_id FROM work_ident         WHERE is_live=true) TO '/tmp/fatcat_ident_works.tsv'               WITH NULL '';  COPY (SELECT id, editgroup_id, timestamp FROM changelog)                         TO '/tmp/fatcat_ident_changelog.tsv'           WITH NULL ''; +COPY ( +  SELECT +    release_ident.id, +    release_ident.rev_id, +    release_ident.redirect_id, +    release_rev.work_ident_id +  FROM +    release_ident +    LEFT JOIN release_rev ON release_ident.rev_id = release_rev.id +  WHERE +    release_ident.is_live=true +    AND release_ident.redirect_id IS NULL +    AND release_ident.rev_id IS NOT NULL +  ORDER BY +    release_rev.work_ident_id ASC NULLS LAST +) TO '/tmp/fatcat_ident_releases_by_work.tsv'            WITH NULL '';  ROLLBACK; diff --git a/extra/sql_dumps/ident_table_snapshot.sh b/extra/sql_dumps/ident_table_snapshot.sh index dbd4caf0..b287a0ce 100755 --- a/extra/sql_dumps/ident_table_snapshot.sh +++ b/extra/sql_dumps/ident_table_snapshot.sh @@ -32,6 +32,7 @@ tar -C /tmp -c --verbose \      fatcat_ident_filesets.tsv \      fatcat_ident_webcaptures.tsv \      fatcat_ident_releases.tsv \ -    fatcat_ident_works.tsv +    fatcat_ident_works.tsv \ +    fatcat_ident_releases_by_work.tsv  echo "Done: $OUTFILE" diff --git a/proposals/20200804_grouped_release_exports.md b/proposals/20200804_grouped_release_exports.md new file mode 100644 index 00000000..d75ba687 --- /dev/null +++ b/proposals/20200804_grouped_release_exports.md @@ -0,0 +1,60 @@ + +Grouped Release Exports +======================= + +Want to have rich "work" entity dumps, without duplicating with enriched +release dumps. The motivation for this is to do work-level processing of +releases. For example, transform into scholar.archive.org (indexed at work +level), bulk downloading fulltext (only need "best copy for entire work"), +citation analysis, etc. + +One elegant way to do this would be having release exports be sorted by +`work_id`, with all release entities with the same `work_id` contiguous (but +not all on the same line). + +Plan is to update fatcat-export tool (rust) to read in an ident list with +`work_id` included (and sorted by the `work_id`). + +## Rust Implementation + +Remembering that `fatcat-export` operates on lines, and uses channels and +worker threads for processing and serialization, we can't just assume that the +output lines will be in the same order as the input lines (eg, worker threads +can shift order). To preserve order, we will add a new dump mode which operates +on batches (`Vec`) of rows instead, and write the batches contiguously. The +batches themselves may end up out of order, but that is fine. + +## SQL Ident Dump + + +Database timing (running 2020-08-04 on fatcat-qa): + +    COPY (SELECT id, rev_id, redirect_id FROM release_ident      WHERE is_live=true) TO '/tmp/fatcat_ident_releases.tsv'            WITH NULL ''; +    => COPY 142635153 +    => Time: 143264.483 ms (02:23.264) + +    COPY ( +      SELECT +        release_ident.id, +        release_ident.rev_id, +        release_ident.redirect_id, +        release_rev.work_ident_id +      FROM +        release_ident +        LEFT JOIN release_rev ON release_ident.rev_id = release_rev.id +      WHERE +        release_ident.is_live=true +        AND release_ident.redirect_id IS NULL +        AND release_ident.rev_id IS NOT NULL +      ORDER BY +        release_rev.work_ident_id ASC NULLS LAST +    ) TO '/tmp/fatcat_ident_releases_by_work.tsv'            WITH NULL ''; +    => COPY 142635147 +    => Time: 610540.112 ms (10:10.540) + +Much slower, but not a blocking issue. Apparently postgresql will do a full +rowscan instead of using the existing `work_id` sorted index on `release_rev` +when going of a large fraction of the table (here almost the entire table) +because it thinks the read will be so much faster. I don't think this is true +with SSDs? Haven't actually confirmed this is the behavior, just assuming based +on postgresql docs. diff --git a/rust/Makefile b/rust/Makefile index 81fb32af..3bfc1b49 100644 --- a/rust/Makefile +++ b/rust/Makefile @@ -10,8 +10,7 @@ help: ## Print info about all commands  .PHONY: test  test: ## Run all tests and lints -	cargo test --  -	pipenv run mypy fatcat_scholar/*.py tests/ --ignore-missing-imports +	cargo test -- --test-threads 1  .PHONY: dev  dev: ## Run web service locally, with reloading diff --git a/rust/src/bin/fatcat-export.rs b/rust/src/bin/fatcat-export.rs index f52c7d17..7d671b9a 100644 --- a/rust/src/bin/fatcat-export.rs +++ b/rust/src/bin/fatcat-export.rs @@ -36,7 +36,8 @@ arg_enum! {          Fileset,          Webcapture,          Release, -        Work +        Work, +        ReleaseByWork,      }  } @@ -44,6 +45,7 @@ struct IdentRow {      ident_id: FatcatId,      rev_id: Option<Uuid>,      redirect_id: Option<FatcatId>, +    group_id: Option<FatcatId>,  }  macro_rules! generic_loop_work { @@ -87,6 +89,39 @@ generic_loop_work!(loop_work_webcapture, WebcaptureEntity);  generic_loop_work!(loop_work_release, ReleaseEntity);  generic_loop_work!(loop_work_work, WorkEntity); +fn loop_work_release_by_work( +    row_receiver: channel::Receiver<Vec<IdentRow>>, +    output_sender: channel::Sender<Vec<String>>, +    db_conn: &DbConn, +    expand: Option<ExpandFlags>, +) { +    let result: Result<()> = (|| { +        for row_batch in row_receiver { +            let mut resp_batch: Vec<String> = vec![]; +            for row in row_batch { +                let mut entity = ReleaseEntity::db_get_rev( +                    db_conn, +                    row.rev_id.expect("valid, non-deleted row"), +                    HideFlags::none(), +                )?; // .chain_err(|| "reading entity from database")?; +                entity.state = Some("active".to_string()); // only active lines were passed +                entity.ident = Some(row.ident_id.to_string()); +                if let Some(expand) = expand { +                    entity.db_expand(db_conn, expand)? +                    // chain_err(|| "expanding sub-entities from database")?; +                } +                resp_batch.push(serde_json::to_string(&entity)?); +            } +            output_sender.send(resp_batch); +        } +        Ok(()) +    })(); +    if let Err(ref e) = result { +        error!("{}", e); // e.display_chain()) +    } +    result.unwrap() +} +  fn loop_printer(      output_receiver: channel::Receiver<String>,      done_sender: channel::Sender<()>, @@ -104,11 +139,37 @@ fn loop_printer(      Ok(())  } +fn loop_batch_printer( +    output_receiver: channel::Receiver<Vec<String>>, +    done_sender: channel::Sender<()>, +) -> Result<()> { +    let output = std::io::stdout(); +    // TODO: should log? +    // let mut buf_output = BufWriter::new(output.lock()); +    let mut buf_output = BufWriter::new(output); +    for batch in output_receiver { +        for line in batch { +            buf_output.write_all(&line.into_bytes())?; +            buf_output.write_all(b"\n")?; +        } +        buf_output.flush()?; +    } +    drop(done_sender); +    Ok(()) +} +  fn parse_line(s: &str) -> Result<IdentRow> {      let fields: Vec<String> = s.split('\t').map(|v| v.to_string()).collect(); -    if fields.len() != 3 { +    let group_id: Option<FatcatId> = if fields.len() == 4 { +        match fields[3].as_ref() { +            "" => None, +            val => Some(FatcatId::from_uuid(&Uuid::from_str(&val)?)), +        } +    } else if fields.len() == 3 { +        None +    } else {          bail!("Invalid input line"); -    } +    };      Ok(IdentRow {          ident_id: FatcatId::from_uuid(&Uuid::from_str(&fields[0])?),          rev_id: match fields[1].as_ref() { @@ -119,6 +180,7 @@ fn parse_line(s: &str) -> Result<IdentRow> {              "" => None,              val => Some(FatcatId::from_uuid(&Uuid::from_str(&val)?)),          }, +        group_id,      })  } @@ -137,6 +199,9 @@ fn test_parse_line() {              .is_err()      );      assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001").is_ok()); +    assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t").is_ok()); +    assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t\t").is_err()); +    assert!(parse_line("00000000-0000-0000-3333-000000000002\t00000000-0000-0000-3333-fff000000001\t00000000-0000-0000-3333-000000000001\t00000000-0000-0000-3333-000000000002").is_ok());  }  // Use num_cpus/2, or CLI arg for worker count @@ -192,6 +257,7 @@ pub fn do_export(              ExportEntityType::Work => {                  thread::spawn(move || loop_work_work(row_receiver, output_sender, &db_conn, expand))              } +            ExportEntityType::ReleaseByWork => unimplemented!(),          };      }      drop(output_sender); @@ -221,22 +287,97 @@ pub fn do_export(      Ok(())  } +pub fn do_export_batch( +    num_workers: usize, +    expand: Option<ExpandFlags>, +    entity_type: ExportEntityType, +    redirects: bool, +) -> Result<()> { +    let db_pool = server::database_worker_pool()?; +    let buf_input = BufReader::new(std::io::stdin()); +    let (row_sender, row_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); +    let (output_sender, output_receiver) = channel::bounded(CHANNEL_BUFFER_LEN); +    let (done_sender, done_receiver) = channel::bounded(0); + +    info!("Starting an export of {} entities", entity_type); + +    // Start row worker threads +    assert!(num_workers > 0); +    for _ in 0..num_workers { +        let db_conn = db_pool.get().expect("database pool"); +        let row_receiver = row_receiver.clone(); +        let output_sender = output_sender.clone(); +        match entity_type { +            ExportEntityType::ReleaseByWork => thread::spawn(move || { +                loop_work_release_by_work(row_receiver, output_sender, &db_conn, expand) +            }), +            _ => unimplemented!(), +        }; +    } +    drop(output_sender); +    // Start printer thread +    thread::spawn(move || { +        loop_batch_printer(output_receiver, done_sender).expect("printing to stdout") +    }); + +    let mut count = 0; +    let mut last_group_id: Option<FatcatId> = None; +    let mut batch = vec![]; +    for line in buf_input.lines() { +        let line = line?; +        let row = parse_line(&line)?; +        match (row.rev_id, row.redirect_id, redirects) { +            (None, _, _) => (), +            (Some(_), Some(_), false) => (), +            _ => { +                if row.group_id == None || row.group_id != last_group_id { +                    if batch.len() > 0 { +                        row_sender.send(batch); +                        batch = vec![]; +                    } +                } +                last_group_id = row.group_id; +                batch.push(row); +            } +        } +        count += 1; +        if count % 1000 == 0 { +            info!("processed {} lines...", count); +        } +    } +    if batch.len() > 0 { +        row_sender.send(batch); +    } +    drop(row_sender); +    done_receiver.recv(); +    info!( +        "Done reading ({} lines), waiting for workers to exit...", +        count +    ); +    Ok(()) +} +  fn main() -> Result<()> {      let m = App::new("fatcat-export")          .version(env!("CARGO_PKG_VERSION"))          .author("Bryan Newbold <bnewbold@archive.org>")          .about("Fast exports of database entities from an id list") -        .arg(Arg::from_usage("<entity_type> 'what entity type the idents correspond to'") -            .possible_values(&ExportEntityType::variants()) -            .case_insensitive(true)) +        .arg( +            Arg::from_usage("<entity_type> 'what entity type the idents correspond to'") +                .possible_values(&ExportEntityType::variants()) +                .case_insensitive(true), +        )          .args_from_usage(              "-j --workers=[workers] 'number of threads (database connections) to use'               -q --quiet 'less status output to stderr'               --include-redirects 'include redirected idents (normally skipped)' -             --expand=[expand] 'sub-entities to include in dump'") -        .after_help("Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \ +             --expand=[expand] 'sub-entities to include in dump'", +        ) +        .after_help( +            "Reads a ident table TSV dump from stdin (aka, ident_id, rev_id, redirect_id), \              and outputs JSON (one entity per line). Database connection info read from environment \ -            (DATABASE_URL, same as fatcatd).") +            (DATABASE_URL, same as fatcatd).", +        )          .get_matches();      let num_workers: usize = match m.value_of("workers") { @@ -255,10 +396,11 @@ fn main() -> Result<()> {      let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log_level);      env_logger::Builder::from_env(env).init(); -    do_export( -        num_workers, -        expand, -        value_t_or_exit!(m.value_of("entity_type"), ExportEntityType), -        m.is_present("include_redirects"), -    ) +    let entity_type = value_t_or_exit!(m.value_of("entity_type"), ExportEntityType); +    let include_redirects = m.is_present("include_redirects"); +    if entity_type == ExportEntityType::ReleaseByWork { +        do_export_batch(num_workers, expand, entity_type, include_redirects) +    } else { +        do_export(num_workers, expand, entity_type, include_redirects) +    }  }  | 
