diff options
-rw-r--r-- | fatcat_scholar/sandcrawler.py | 75 | ||||
-rw-r--r-- | fatcat_scholar/work_pipeline.py | 61 |
2 files changed, 124 insertions, 12 deletions
diff --git a/fatcat_scholar/sandcrawler.py b/fatcat_scholar/sandcrawler.py new file mode 100644 index 0000000..db6014f --- /dev/null +++ b/fatcat_scholar/sandcrawler.py @@ -0,0 +1,75 @@ + +import json +import minio +import requests +from typing import Dict, Optional, Any + +class SandcrawlerPostgrestClient(): + + def __init__(self, api_url: str): + self.api_url = api_url + + def get_grobid(self, sha1: str) -> Optional[Dict[str, Any]]: + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + +class SandcrawlerMinioClient(object): + + def __init__(self, host_url: str, access_key: Optional[str] = None, secret_key: Optional[str] = None, default_bucket: Optional[str] = "sandcrawler"): + """ + host is minio connection string (host:port) + access and secret key are as expected + default_bucket can be supplied so that it doesn't need to be repeated for each function call + + Example config: + + host="localhost:9000", + access_key=os.environ['MINIO_ACCESS_KEY'], + secret_key=os.environ['MINIO_SECRET_KEY'], + """ + self.mc = minio.Minio( + host_url, + access_key=access_key, + secret_key=secret_key, + secure=False, + ) + self.default_bucket = default_bucket + + def _blob_path(self, folder, sha1hex, extension, prefix): + if not extension: + extension = "" + if not prefix: + prefix = "" + assert len(sha1hex) == 40 + obj_path = "{}{}/{}/{}/{}{}".format( + prefix, + folder, + sha1hex[0:2], + sha1hex[2:4], + sha1hex, + extension, + ) + return obj_path + + def get_blob(self, folder, sha1hex, extension="", prefix="", bucket=None): + """ + sha1hex is sha1 of the blob itself + + Fetched blob from the given bucket/folder, using the sandcrawler SHA1 path convention + """ + obj_path = self._blob_path(folder, sha1hex, extension, prefix) + if not bucket: + bucket = self.default_bucket + assert bucket + blob = self.mc.get_object( + bucket, + obj_path, + ) + # TODO: optionally verify SHA-1? + return blob.data diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py index 7ed0eac..161327a 100644 --- a/fatcat_scholar/work_pipeline.py +++ b/fatcat_scholar/work_pipeline.py @@ -1,4 +1,5 @@ +import os import io import sys import argparse @@ -9,6 +10,7 @@ import internetarchive from fatcat_scholar.api_entities import * from fatcat_scholar.djvu import djvu_extract_leaf_texts +from fatcat_scholar.sandcrawler import SandcrawlerPostgrestClient, SandcrawlerMinioClient from fatcat_scholar.issue_db import IssueDB, SimIssueRow from fatcat_scholar.es_transform import es_biblio_from_release, es_release_from_release, DocType @@ -79,21 +81,41 @@ def fulltext_pref_list(releases: List[ReleaseEntity]) -> List[str]: class WorkPipeline(): - def __init__(self, issue_db: IssueDB): + def __init__(self, issue_db: IssueDB, sandcrawler_db_client: SandcrawlerPostgrestClient, sandcrawler_s3_client: SandcrawlerMinioClient): self.issue_db: IssueDB = issue_db self.ia_client = internetarchive.get_session() - # TODO: postgrest client - # TODO: minio client + self.sandcrawler_db_client = sandcrawler_db_client + self.sandcrawler_s3_client = sandcrawler_s3_client - def fetch_file_grobid(self, fe: FileEntity) -> Optional[Any]: + def fetch_file_grobid(self, fe: FileEntity, release_ident: str) -> Optional[Any]: """ tei_xml: str release_ident: Optional[str] file_ident: Optional[str] """ - return None + if not fe.sha1: + return None + if not fe.urls: + return None + grobid_meta = self.sandcrawler_db_client.get_grobid(fe.sha1) + if grobid_meta['status'] != 'success': + return None + #print(grobid_meta) + grobid_xml = self.sandcrawler_s3_client.get_blob( + folder="grobid", + sha1hex=fe.sha1, + extension=".tei.xml", + prefix="", + bucket="sandcrawler", + ) + #print(grobid_xml) + return dict( + tei_xml=grobid_xml, + release_ident=release_ident, + file_ident=fe.ident, + ) - def fetch_file_pdftotext(self, fe: FileEntity) -> Optional[Any]: + def fetch_file_pdftotext(self, fe: FileEntity, release_ident: str) -> Optional[Any]: """ raw_text: str release_ident: Optional[str] @@ -190,8 +212,8 @@ class WorkPipeline(): #print(f"pref_idents={pref_idents}", file=sys.stderr) # find best accessible fatcat file - grobid_fulltext: Optional[Any] - pdftotext_fulltext: Optional[Any] + grobid_fulltext: Optional[Any] = None + pdftotext_fulltext: Optional[Any] = None for ident in pref_idents: release = release_dict[ident] if not release.files: @@ -199,8 +221,8 @@ class WorkPipeline(): for fe in release.files: if not fe.sha1 or fe.mimetype not in (None, "application/pdf"): continue - grobid_fulltext = self.fetch_file_grobid(fe) - pdftotext_fulltext = self.fetch_file_pdftotext(fe) + grobid_fulltext = self.fetch_file_grobid(fe, ident) + pdftotext_fulltext = self.fetch_file_pdftotext(fe, ident) if grobid_fulltext or pdftotext_fulltext: break if grobid_fulltext or pdftotext_fulltext: @@ -279,6 +301,14 @@ def main(): help="sqlite3 database file to open", default='data/issue_db.sqlite', type=str) + parser.add_argument("--sandcrawler-db-api", + help="Sandcrawler Postgrest API endpoint", + default='http://aitio.us.archive.org:3030', + type=str) + parser.add_argument("--sandcrawler-s3-api", + help="Sandcrawler S3 (minio/seaweedfs) API endpoint", + default='aitio.us.archive.org:9000', + type=str) sub = subparsers.add_parser('run_releases', help="takes expanded release entity JSON, sorted by work_ident") @@ -292,8 +322,15 @@ def main(): print("tell me what to do! (try --help)") sys.exit(-1) - idb = IssueDB(args.issue_db_file) - wp = WorkPipeline(issue_db=idb) + wp = WorkPipeline( + issue_db=IssueDB(args.issue_db_file), + sandcrawler_db_client=SandcrawlerPostgrestClient(api_url=args.sandcrawler_db_api), + sandcrawler_s3_client=SandcrawlerMinioClient( + host_url=args.sandcrawler_s3_api, + access_key=os.environ.get('MINIO_ACCESS_KEY'), + secret_key=os.environ.get('MINIO_SECRET_KEY'), + ), + ) if args.func == 'run_releases': wp.run_releases(args.json_file) |