diff options
Diffstat (limited to 'fatcat_scholar/work_pipeline.py')
-rw-r--r-- | fatcat_scholar/work_pipeline.py | 188 |
1 files changed, 120 insertions, 68 deletions
diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py index 46e40e1..af558a3 100644 --- a/fatcat_scholar/work_pipeline.py +++ b/fatcat_scholar/work_pipeline.py @@ -1,4 +1,3 @@ - import os import io import sys @@ -12,9 +11,17 @@ import internetarchive from fatcat_scholar.api_entities import * from fatcat_scholar.djvu import djvu_extract_leaf_texts -from fatcat_scholar.sandcrawler import SandcrawlerPostgrestClient, SandcrawlerMinioClient +from fatcat_scholar.sandcrawler import ( + SandcrawlerPostgrestClient, + SandcrawlerMinioClient, +) from fatcat_scholar.issue_db import IssueDB, SimIssueRow, SimPubRow -from fatcat_scholar.schema import es_biblio_from_release, es_release_from_release, DocType, IntermediateBundle +from fatcat_scholar.schema import ( + es_biblio_from_release, + es_release_from_release, + DocType, + IntermediateBundle, +) from fatcat_scholar.sim_pipeline import truncate_pub_meta, truncate_issue_meta @@ -25,17 +32,18 @@ def parse_pages(raw: str) -> Tuple[Optional[int], Optional[int]]: first = int(first_raw) if not "-" in raw: return (first, first) - last_raw = raw.split('-')[-1] + last_raw = raw.split("-")[-1] if not last_raw.isdigit(): return (first, first) last = int(last_raw) if last < first: - last_munge = first_raw[0:(len(first_raw)-len(last_raw))] + last_raw + last_munge = first_raw[0 : (len(first_raw) - len(last_raw))] + last_raw last = int(last_munge) if last < first: return (first, first) return (first, last) + def test_parse_pages(): assert parse_pages("479-89") == (479, 489) assert parse_pages("466-7") == (466, 467) @@ -52,24 +60,33 @@ def fulltext_pref_list(releases: List[ReleaseEntity]) -> List[str]: Returns a list of release idents in preference order (best first) to try and find fulltext for. """ - releases_sorted = sorted(releases, reverse=True, key=lambda r: ( - r.release_stage=="updated", - r.release_stage=="published", - r.volume is not None, - r.container_id is not None, - r.ext_ids.pmid is not None, - r.release_stage=="submitted", - r.release_type is not None, - r.release_year, - r.release_date, - r.version, - )) + releases_sorted = sorted( + releases, + reverse=True, + key=lambda r: ( + r.release_stage == "updated", + r.release_stage == "published", + r.volume is not None, + r.container_id is not None, + r.ext_ids.pmid is not None, + r.release_stage == "submitted", + r.release_type is not None, + r.release_year, + r.release_date, + r.version, + ), + ) return [r.ident for r in releases_sorted] -class WorkPipeline(): - - def __init__(self, issue_db: IssueDB, sandcrawler_db_client: SandcrawlerPostgrestClient, sandcrawler_s3_client: SandcrawlerMinioClient, fulltext_cache_dir=None): +class WorkPipeline: + def __init__( + self, + issue_db: IssueDB, + sandcrawler_db_client: SandcrawlerPostgrestClient, + sandcrawler_s3_client: SandcrawlerMinioClient, + fulltext_cache_dir=None, + ): self.issue_db: IssueDB = issue_db self.ia_client = internetarchive.get_session() self.sandcrawler_db_client = sandcrawler_db_client @@ -87,9 +104,9 @@ class WorkPipeline(): if not fe.urls: return None grobid_meta = self.sandcrawler_db_client.get_grobid(fe.sha1) - if not grobid_meta or grobid_meta['status'] != 'success': + if not grobid_meta or grobid_meta["status"] != "success": return None - #print(grobid_meta) + # print(grobid_meta) try: grobid_xml = self.sandcrawler_s3_client.get_blob( folder="grobid", @@ -98,13 +115,11 @@ class WorkPipeline(): prefix="", bucket="sandcrawler", ) - #print(grobid_xml) + # print(grobid_xml) except minio.error.NoSuchKey: return None return dict( - tei_xml=grobid_xml, - release_ident=release_ident, - file_ident=fe.ident, + tei_xml=grobid_xml, release_ident=release_ident, file_ident=fe.ident, ) def fetch_file_pdftotext(self, fe: FileEntity, release_ident: str) -> Optional[Any]: @@ -115,14 +130,14 @@ class WorkPipeline(): """ # HACK: look for local pdftotext output if self.fulltext_cache_dir: - local_txt_path = f"{self.fulltext_cache_dir}/pdftotext/{fe.sha1[:2]}/{fe.sha1}.txt" + local_txt_path = ( + f"{self.fulltext_cache_dir}/pdftotext/{fe.sha1[:2]}/{fe.sha1}.txt" + ) try: - with open(local_txt_path, 'r') as txt_file: + with open(local_txt_path, "r") as txt_file: raw_text = txt_file.read() return dict( - raw_text=raw_text, - release_ident=release_ident, - file_ident=fe.ident, + raw_text=raw_text, release_ident=release_ident, file_ident=fe.ident, ) except FileNotFoundError: pass @@ -144,9 +159,17 @@ class WorkPipeline(): if not sim_pubid: return None - return self.issue_db.lookup_issue(sim_pubid=sim_pubid, volume=release.volume, issue=release.issue) + return self.issue_db.lookup_issue( + sim_pubid=sim_pubid, volume=release.volume, issue=release.issue + ) - def fetch_sim(self, issue_db_row: SimIssueRow, issue_db_pub_row: SimPubRow, pages: str, release_ident: str) -> Optional[Any]: + def fetch_sim( + self, + issue_db_row: SimIssueRow, + issue_db_pub_row: SimPubRow, + pages: str, + release_ident: str, + ) -> Optional[Any]: """ issue_item pages: str @@ -169,17 +192,17 @@ class WorkPipeline(): leaf_index = dict() leaf_list = [] - if not 'page_numbers' in issue_meta: + if not "page_numbers" in issue_meta: # TODO: warn return None - for entry in issue_meta['page_numbers'].get('pages', []): - page_num = entry['pageNumber'] - leaf_index[entry['leafNum']] = page_num + for entry in issue_meta["page_numbers"].get("pages", []): + page_num = entry["pageNumber"] + leaf_index[entry["leafNum"]] = page_num if not (page_num and page_num.isdigit()): continue page_num = int(page_num) if page_num >= first_page and page_num <= last_page: - leaf_list.append(entry['leafNum']) + leaf_list.append(entry["leafNum"]) if not leaf_list: return None @@ -190,16 +213,22 @@ class WorkPipeline(): # override 'close()' method so we can still read out contents djvu_bytes = io.BytesIO() - djvu_bytes.close = lambda: None # type: ignore + djvu_bytes.close = lambda: None # type: ignore assert issue_item_djvu.download(fileobj=djvu_bytes) == True djvu_bytes.seek(0) djvu_xml = io.StringIO(djvu_bytes.read().decode("UTF-8")) - del(djvu_bytes) + del djvu_bytes leaf_dict = djvu_extract_leaf_texts(djvu_xml, only_leaves=leaf_list) for leaf_num, raw_text in leaf_dict.items(): - page_texts.append(dict(page_num=leaf_index.get(leaf_num), leaf_num=leaf_num, raw_text=raw_text)) + page_texts.append( + dict( + page_num=leaf_index.get(leaf_num), + leaf_num=leaf_num, + raw_text=raw_text, + ) + ) return dict( issue_item=issue_db_row.issue_item, @@ -220,7 +249,7 @@ class WorkPipeline(): pref_idents = fulltext_pref_list(releases) release_dict = dict([(r.ident, r) for r in releases]) - #print(f"pref_idents={pref_idents}", file=sys.stderr) + # print(f"pref_idents={pref_idents}", file=sys.stderr) # find best accessible fatcat file grobid_fulltext: Optional[Any] = None @@ -244,12 +273,12 @@ class WorkPipeline(): sim_issue: Optional[Any] = None for ident in pref_idents: release = release_dict[ident] - #print(f"{release.extra}\n{release.pages}", file=sys.stderr) + # print(f"{release.extra}\n{release.pages}", file=sys.stderr) if not release.pages: continue # TODO: in the future, will use release.extra.ia.sim.sim_pubid for lookup sim_issue = self.lookup_sim(release) - #print(f"release_{release.ident}: sim_issue={sim_issue}", file=sys.stderr) + # print(f"release_{release.ident}: sim_issue={sim_issue}", file=sys.stderr) if not sim_issue: continue sim_pub = self.issue_db.lookup_pub(sim_issue.sim_pubid) @@ -257,7 +286,9 @@ class WorkPipeline(): continue # XXX: control flow tweak? try: - sim_fulltext = self.fetch_sim(sim_issue, sim_pub, release.pages, release.ident) + sim_fulltext = self.fetch_sim( + sim_issue, sim_pub, release.pages, release.ident + ) except requests.exceptions.ConnectionError as e: print(str(e), file=sys.stderr) continue @@ -300,13 +331,16 @@ class WorkPipeline(): ib = self.process_release_list(batch) print(ib.json()) batch_work_id = None - batch = [release,] + batch = [ + release, + ] batch_work_id = release.work_id if batch: ib = self.process_release_list(batch) print(ib.json()) + def main(): """ Run this command like: @@ -315,31 +349,46 @@ def main(): """ parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) subparsers = parser.add_subparsers() - parser.add_argument("--issue-db-file", + parser.add_argument( + "--issue-db-file", help="sqlite3 database file to open", - default='data/issue_db.sqlite', - type=str) - parser.add_argument("--sandcrawler-db-api", + default="data/issue_db.sqlite", + type=str, + ) + parser.add_argument( + "--sandcrawler-db-api", help="Sandcrawler Postgrest API endpoint", - default='http://aitio.us.archive.org:3030', - type=str) - parser.add_argument("--sandcrawler-s3-api", + default="http://aitio.us.archive.org:3030", + type=str, + ) + parser.add_argument( + "--sandcrawler-s3-api", help="Sandcrawler S3 (minio/seaweedfs) API endpoint", - default='aitio.us.archive.org:9000', - type=str) + default="aitio.us.archive.org:9000", + type=str, + ) - sub = subparsers.add_parser('run_releases', - help="takes expanded release entity JSON, sorted by work_ident") - sub.set_defaults(func='run_releases') - sub.add_argument("json_file", + sub = subparsers.add_parser( + "run_releases", help="takes expanded release entity JSON, sorted by work_ident" + ) + sub.set_defaults(func="run_releases") + sub.add_argument( + "json_file", help="release entities, as JSON-lines", - nargs='?', default=sys.stdin, type=argparse.FileType('r')) - sub.add_argument("--fulltext-cache-dir", + nargs="?", + default=sys.stdin, + type=argparse.FileType("r"), + ) + sub.add_argument( + "--fulltext-cache-dir", help="path of local directory with pdftotext fulltext (and thumbnails)", - default=None, type=str) + default=None, + type=str, + ) args = parser.parse_args() if not args.__dict__.get("func"): @@ -348,20 +397,23 @@ def main(): wp = WorkPipeline( issue_db=IssueDB(args.issue_db_file), - sandcrawler_db_client=SandcrawlerPostgrestClient(api_url=args.sandcrawler_db_api), + sandcrawler_db_client=SandcrawlerPostgrestClient( + api_url=args.sandcrawler_db_api + ), sandcrawler_s3_client=SandcrawlerMinioClient( host_url=args.sandcrawler_s3_api, - access_key=os.environ.get('MINIO_ACCESS_KEY'), - secret_key=os.environ.get('MINIO_SECRET_KEY'), + access_key=os.environ.get("MINIO_ACCESS_KEY"), + secret_key=os.environ.get("MINIO_SECRET_KEY"), ), fulltext_cache_dir=args.fulltext_cache_dir, ) - if args.func == 'run_releases': + if args.func == "run_releases": wp.run_releases(args.json_file) else: func = getattr(wp, args.func) func() -if __name__=="__main__": + +if __name__ == "__main__": main() |