diff options
author | Bryan Newbold <bnewbold@archive.org> | 2021-01-26 00:55:05 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2021-01-26 00:55:05 -0800 |
commit | 15c7c9ea0f09b2e30dffa85cd79a9f761ea29607 (patch) | |
tree | 228075896b62d49c254bb588de1122948f8ccde4 /fatcat_scholar/sim_pipeline.py | |
parent | 2995379f558e8f5c2712bb17467586644d2d2fb4 (diff) | |
download | fatcat-scholar-15c7c9ea0f09b2e30dffa85cd79a9f761ea29607.tar.gz fatcat-scholar-15c7c9ea0f09b2e30dffa85cd79a9f761ea29607.zip |
sim indexing: new parallel fetch structure
Diffstat (limited to 'fatcat_scholar/sim_pipeline.py')
-rw-r--r-- | fatcat_scholar/sim_pipeline.py | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/fatcat_scholar/sim_pipeline.py b/fatcat_scholar/sim_pipeline.py index 926d943..0277619 100644 --- a/fatcat_scholar/sim_pipeline.py +++ b/fatcat_scholar/sim_pipeline.py @@ -38,6 +38,8 @@ def truncate_issue_meta(full: Dict[str, Any]) -> Dict[str, Any]: Same as truncate_pub_meta() but for issue item metadata """ full.pop("files") + full.pop("histograms", None) + full.pop("rotations", None) return full @@ -132,6 +134,11 @@ class SimPipeline: return pages def run_issue_db(self, limit: int = None) -> None: + """ + Legacy/Deprecated code path + + Runs DB iteration and fetching in single thread/process + """ count = 0 self.issue_db.db.row_factory = sqlite3.Row cur = self.issue_db.db.cursor() @@ -165,6 +172,45 @@ class SimPipeline: if limit is not None and count >= limit: break + def run_print_issues(self, max_release_count: int = 5) -> None: + """ + Iterates over issues, printing as TSV to stdout + + Intended to be used with GNU/parallel for coarse parallelism. + """ + self.issue_db.db.row_factory = sqlite3.Row + cur = self.issue_db.db.cursor() + for row in cur.execute( + f"SELECT * FROM sim_issue LEFT JOIN sim_pub ON sim_issue.sim_pubid = sim_pub.sim_pubid WHERE sim_issue.release_count < {max_release_count}" + ): + # filter out "contents" and "index" items + # TODO: more filters; also redundant with IssueDB code? + if row["issue_item"].endswith("_contents") or row["issue_item"].endswith( + "_index" + ): + continue + print(f"{row['issue_item']}\t{row['pub_collection']}") + + def run_fetch_issue(self, issue_item: str, pub_collection: str) -> None: + """ + Fetches SIM issue. + + TODO: more advanced retries? + """ + try: + full_issue = self.fetch_sim_issue(issue_item, pub_collection) + except requests.exceptions.ConnectionError as e: + print(str(e), file=sys.stderr) + return + except requests.exceptions.ReadTimeout as e: + print(str(e), file=sys.stderr) + return + if not full_issue: + return + pages = self.full_issue_to_pages(full_issue) + for bundle in pages: + print(bundle.json(exclude_none=True, sort_keys=True)) + def main() -> None: """ @@ -189,6 +235,19 @@ def main() -> None: sub.set_defaults(func="run_issue_db") sub.add_argument("--limit", help="maximum number of pages to index", type=int) + sub = subparsers.add_parser( + "run_print_issues", + help="dumps issues as simple TSV rows (for parallel processing)", + ) + sub.set_defaults(func="run_print_issues") + + sub = subparsers.add_parser( + "run_fetch_issue", help="fetches pages for given issue item" + ) + sub.add_argument("issue_item", type=str) + sub.add_argument("pub_collection", type=str) + sub.set_defaults(func="run_fetch_issue") + args = parser.parse_args() if not args.__dict__.get("func"): parser.print_help(file=sys.stderr) @@ -198,6 +257,12 @@ def main() -> None: if args.func == "run_issue_db": sp.run_issue_db(limit=args.limit) + elif args.func == "run_print_issues": + sp.run_print_issues() + elif args.func == "run_fetch_issue": + sp.run_fetch_issue( + issue_item=args.issue_item, pub_collection=args.pub_collection + ) else: func = getattr(sp, args.func) func() |