diff options
Diffstat (limited to 'fatcat_scholar/work_pipeline.py')
-rw-r--r-- | fatcat_scholar/work_pipeline.py | 61 |
1 files changed, 49 insertions, 12 deletions
diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py index 7ed0eac..161327a 100644 --- a/fatcat_scholar/work_pipeline.py +++ b/fatcat_scholar/work_pipeline.py @@ -1,4 +1,5 @@ +import os import io import sys import argparse @@ -9,6 +10,7 @@ import internetarchive from fatcat_scholar.api_entities import * from fatcat_scholar.djvu import djvu_extract_leaf_texts +from fatcat_scholar.sandcrawler import SandcrawlerPostgrestClient, SandcrawlerMinioClient from fatcat_scholar.issue_db import IssueDB, SimIssueRow from fatcat_scholar.es_transform import es_biblio_from_release, es_release_from_release, DocType @@ -79,21 +81,41 @@ def fulltext_pref_list(releases: List[ReleaseEntity]) -> List[str]: class WorkPipeline(): - def __init__(self, issue_db: IssueDB): + def __init__(self, issue_db: IssueDB, sandcrawler_db_client: SandcrawlerPostgrestClient, sandcrawler_s3_client: SandcrawlerMinioClient): self.issue_db: IssueDB = issue_db self.ia_client = internetarchive.get_session() - # TODO: postgrest client - # TODO: minio client + self.sandcrawler_db_client = sandcrawler_db_client + self.sandcrawler_s3_client = sandcrawler_s3_client - def fetch_file_grobid(self, fe: FileEntity) -> Optional[Any]: + def fetch_file_grobid(self, fe: FileEntity, release_ident: str) -> Optional[Any]: """ tei_xml: str release_ident: Optional[str] file_ident: Optional[str] """ - return None + if not fe.sha1: + return None + if not fe.urls: + return None + grobid_meta = self.sandcrawler_db_client.get_grobid(fe.sha1) + if grobid_meta['status'] != 'success': + return None + #print(grobid_meta) + grobid_xml = self.sandcrawler_s3_client.get_blob( + folder="grobid", + sha1hex=fe.sha1, + extension=".tei.xml", + prefix="", + bucket="sandcrawler", + ) + #print(grobid_xml) + return dict( + tei_xml=grobid_xml, + release_ident=release_ident, + file_ident=fe.ident, + ) - def fetch_file_pdftotext(self, fe: FileEntity) -> Optional[Any]: + def fetch_file_pdftotext(self, fe: FileEntity, release_ident: str) -> Optional[Any]: """ raw_text: str release_ident: Optional[str] @@ -190,8 +212,8 @@ class WorkPipeline(): #print(f"pref_idents={pref_idents}", file=sys.stderr) # find best accessible fatcat file - grobid_fulltext: Optional[Any] - pdftotext_fulltext: Optional[Any] + grobid_fulltext: Optional[Any] = None + pdftotext_fulltext: Optional[Any] = None for ident in pref_idents: release = release_dict[ident] if not release.files: @@ -199,8 +221,8 @@ class WorkPipeline(): for fe in release.files: if not fe.sha1 or fe.mimetype not in (None, "application/pdf"): continue - grobid_fulltext = self.fetch_file_grobid(fe) - pdftotext_fulltext = self.fetch_file_pdftotext(fe) + grobid_fulltext = self.fetch_file_grobid(fe, ident) + pdftotext_fulltext = self.fetch_file_pdftotext(fe, ident) if grobid_fulltext or pdftotext_fulltext: break if grobid_fulltext or pdftotext_fulltext: @@ -279,6 +301,14 @@ def main(): help="sqlite3 database file to open", default='data/issue_db.sqlite', type=str) + parser.add_argument("--sandcrawler-db-api", + help="Sandcrawler Postgrest API endpoint", + default='http://aitio.us.archive.org:3030', + type=str) + parser.add_argument("--sandcrawler-s3-api", + help="Sandcrawler S3 (minio/seaweedfs) API endpoint", + default='aitio.us.archive.org:9000', + type=str) sub = subparsers.add_parser('run_releases', help="takes expanded release entity JSON, sorted by work_ident") @@ -292,8 +322,15 @@ def main(): print("tell me what to do! (try --help)") sys.exit(-1) - idb = IssueDB(args.issue_db_file) - wp = WorkPipeline(issue_db=idb) + wp = WorkPipeline( + issue_db=IssueDB(args.issue_db_file), + sandcrawler_db_client=SandcrawlerPostgrestClient(api_url=args.sandcrawler_db_api), + sandcrawler_s3_client=SandcrawlerMinioClient( + host_url=args.sandcrawler_s3_api, + access_key=os.environ.get('MINIO_ACCESS_KEY'), + secret_key=os.environ.get('MINIO_SECRET_KEY'), + ), + ) if args.func == 'run_releases': wp.run_releases(args.json_file) |