aboutsummaryrefslogtreecommitdiffstats
path: root/fatcat_scholar/sim_pipeline.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-01-26 00:55:05 -0800
committerBryan Newbold <bnewbold@archive.org>2021-01-26 00:55:05 -0800
commit15c7c9ea0f09b2e30dffa85cd79a9f761ea29607 (patch)
tree228075896b62d49c254bb588de1122948f8ccde4 /fatcat_scholar/sim_pipeline.py
parent2995379f558e8f5c2712bb17467586644d2d2fb4 (diff)
downloadfatcat-scholar-15c7c9ea0f09b2e30dffa85cd79a9f761ea29607.tar.gz
fatcat-scholar-15c7c9ea0f09b2e30dffa85cd79a9f761ea29607.zip
sim indexing: new parallel fetch structure
Diffstat (limited to 'fatcat_scholar/sim_pipeline.py')
-rw-r--r--fatcat_scholar/sim_pipeline.py65
1 files changed, 65 insertions, 0 deletions
diff --git a/fatcat_scholar/sim_pipeline.py b/fatcat_scholar/sim_pipeline.py
index 926d943..0277619 100644
--- a/fatcat_scholar/sim_pipeline.py
+++ b/fatcat_scholar/sim_pipeline.py
@@ -38,6 +38,8 @@ def truncate_issue_meta(full: Dict[str, Any]) -> Dict[str, Any]:
Same as truncate_pub_meta() but for issue item metadata
"""
full.pop("files")
+ full.pop("histograms", None)
+ full.pop("rotations", None)
return full
@@ -132,6 +134,11 @@ class SimPipeline:
return pages
def run_issue_db(self, limit: int = None) -> None:
+ """
+ Legacy/Deprecated code path
+
+ Runs DB iteration and fetching in single thread/process
+ """
count = 0
self.issue_db.db.row_factory = sqlite3.Row
cur = self.issue_db.db.cursor()
@@ -165,6 +172,45 @@ class SimPipeline:
if limit is not None and count >= limit:
break
+ def run_print_issues(self, max_release_count: int = 5) -> None:
+ """
+ Iterates over issues, printing as TSV to stdout
+
+ Intended to be used with GNU/parallel for coarse parallelism.
+ """
+ self.issue_db.db.row_factory = sqlite3.Row
+ cur = self.issue_db.db.cursor()
+ for row in cur.execute(
+ f"SELECT * FROM sim_issue LEFT JOIN sim_pub ON sim_issue.sim_pubid = sim_pub.sim_pubid WHERE sim_issue.release_count < {max_release_count}"
+ ):
+ # filter out "contents" and "index" items
+ # TODO: more filters; also redundant with IssueDB code?
+ if row["issue_item"].endswith("_contents") or row["issue_item"].endswith(
+ "_index"
+ ):
+ continue
+ print(f"{row['issue_item']}\t{row['pub_collection']}")
+
+ def run_fetch_issue(self, issue_item: str, pub_collection: str) -> None:
+ """
+ Fetches SIM issue.
+
+ TODO: more advanced retries?
+ """
+ try:
+ full_issue = self.fetch_sim_issue(issue_item, pub_collection)
+ except requests.exceptions.ConnectionError as e:
+ print(str(e), file=sys.stderr)
+ return
+ except requests.exceptions.ReadTimeout as e:
+ print(str(e), file=sys.stderr)
+ return
+ if not full_issue:
+ return
+ pages = self.full_issue_to_pages(full_issue)
+ for bundle in pages:
+ print(bundle.json(exclude_none=True, sort_keys=True))
+
def main() -> None:
"""
@@ -189,6 +235,19 @@ def main() -> None:
sub.set_defaults(func="run_issue_db")
sub.add_argument("--limit", help="maximum number of pages to index", type=int)
+ sub = subparsers.add_parser(
+ "run_print_issues",
+ help="dumps issues as simple TSV rows (for parallel processing)",
+ )
+ sub.set_defaults(func="run_print_issues")
+
+ sub = subparsers.add_parser(
+ "run_fetch_issue", help="fetches pages for given issue item"
+ )
+ sub.add_argument("issue_item", type=str)
+ sub.add_argument("pub_collection", type=str)
+ sub.set_defaults(func="run_fetch_issue")
+
args = parser.parse_args()
if not args.__dict__.get("func"):
parser.print_help(file=sys.stderr)
@@ -198,6 +257,12 @@ def main() -> None:
if args.func == "run_issue_db":
sp.run_issue_db(limit=args.limit)
+ elif args.func == "run_print_issues":
+ sp.run_print_issues()
+ elif args.func == "run_fetch_issue":
+ sp.run_fetch_issue(
+ issue_item=args.issue_item, pub_collection=args.pub_collection
+ )
else:
func = getattr(sp, args.func)
func()