diff options
-rw-r--r-- | fatcat_scholar/sim_pipeline.py | 65 | ||||
-rw-r--r-- | notes/indexing_pipeline.md | 8 |
2 files changed, 73 insertions, 0 deletions
diff --git a/fatcat_scholar/sim_pipeline.py b/fatcat_scholar/sim_pipeline.py index 926d943..0277619 100644 --- a/fatcat_scholar/sim_pipeline.py +++ b/fatcat_scholar/sim_pipeline.py @@ -38,6 +38,8 @@ def truncate_issue_meta(full: Dict[str, Any]) -> Dict[str, Any]: Same as truncate_pub_meta() but for issue item metadata """ full.pop("files") + full.pop("histograms", None) + full.pop("rotations", None) return full @@ -132,6 +134,11 @@ class SimPipeline: return pages def run_issue_db(self, limit: int = None) -> None: + """ + Legacy/Deprecated code path + + Runs DB iteration and fetching in single thread/process + """ count = 0 self.issue_db.db.row_factory = sqlite3.Row cur = self.issue_db.db.cursor() @@ -165,6 +172,45 @@ class SimPipeline: if limit is not None and count >= limit: break + def run_print_issues(self, max_release_count: int = 5) -> None: + """ + Iterates over issues, printing as TSV to stdout + + Intended to be used with GNU/parallel for coarse parallelism. + """ + self.issue_db.db.row_factory = sqlite3.Row + cur = self.issue_db.db.cursor() + for row in cur.execute( + f"SELECT * FROM sim_issue LEFT JOIN sim_pub ON sim_issue.sim_pubid = sim_pub.sim_pubid WHERE sim_issue.release_count < {max_release_count}" + ): + # filter out "contents" and "index" items + # TODO: more filters; also redundant with IssueDB code? + if row["issue_item"].endswith("_contents") or row["issue_item"].endswith( + "_index" + ): + continue + print(f"{row['issue_item']}\t{row['pub_collection']}") + + def run_fetch_issue(self, issue_item: str, pub_collection: str) -> None: + """ + Fetches SIM issue. + + TODO: more advanced retries? + """ + try: + full_issue = self.fetch_sim_issue(issue_item, pub_collection) + except requests.exceptions.ConnectionError as e: + print(str(e), file=sys.stderr) + return + except requests.exceptions.ReadTimeout as e: + print(str(e), file=sys.stderr) + return + if not full_issue: + return + pages = self.full_issue_to_pages(full_issue) + for bundle in pages: + print(bundle.json(exclude_none=True, sort_keys=True)) + def main() -> None: """ @@ -189,6 +235,19 @@ def main() -> None: sub.set_defaults(func="run_issue_db") sub.add_argument("--limit", help="maximum number of pages to index", type=int) + sub = subparsers.add_parser( + "run_print_issues", + help="dumps issues as simple TSV rows (for parallel processing)", + ) + sub.set_defaults(func="run_print_issues") + + sub = subparsers.add_parser( + "run_fetch_issue", help="fetches pages for given issue item" + ) + sub.add_argument("issue_item", type=str) + sub.add_argument("pub_collection", type=str) + sub.set_defaults(func="run_fetch_issue") + args = parser.parse_args() if not args.__dict__.get("func"): parser.print_help(file=sys.stderr) @@ -198,6 +257,12 @@ def main() -> None: if args.func == "run_issue_db": sp.run_issue_db(limit=args.limit) + elif args.func == "run_print_issues": + sp.run_print_issues() + elif args.func == "run_fetch_issue": + sp.run_fetch_issue( + issue_item=args.issue_item, pub_collection=args.pub_collection + ) else: func = getattr(sp, args.func) func() diff --git a/notes/indexing_pipeline.md b/notes/indexing_pipeline.md index f891d27..ce4d687 100644 --- a/notes/indexing_pipeline.md +++ b/notes/indexing_pipeline.md @@ -46,3 +46,11 @@ Transform and index both into local elasticsearch: => 132635 docs in 2m18.787824205s at 955.667 docs/s with 4 workers +## Iterated + + # in pipenv shell + python -m fatcat_scholar.sim_pipeline run_print_issues \ + | parallel -j8 --colsep "\t" python -m fatcat_scholar.sim_pipeline run_fetch_issue {1} {2} \ + | pv -l \ + | gzip \ + > data/sim_intermediate.json.gz |