From 0abb779be2cd6fc913f3c57d891b040b40baf6c3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sat, 16 May 2020 19:52:17 -0700 Subject: initial progress on work pipeline --- fatcat_scholar/issue_db.py | 31 ++++ fatcat_scholar/work_pipeline.py | 305 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100644 fatcat_scholar/work_pipeline.py (limited to 'fatcat_scholar') diff --git a/fatcat_scholar/issue_db.py b/fatcat_scholar/issue_db.py index 0d33e17..5278750 100644 --- a/fatcat_scholar/issue_db.py +++ b/fatcat_scholar/issue_db.py @@ -44,6 +44,19 @@ class SimIssueRow: def tuple(self): return (self.issue_item, self.sim_pubid, self.year, self.volume, self.issue, self.first_page, self.last_page, self.release_count) + @classmethod + def from_tuple(self, row: Any): + return SimIssueRow( + issue_item=row[0], + sim_pubid=row[1], + year=row[2], + volume=row[3], + issue=row[4], + first_page=row[5], + last_page=row[6], + release_count=row[7], + ) + @dataclass class ReleaseCountsRow: sim_pubid: str @@ -95,6 +108,7 @@ class IssueDB(): """ self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE') self._pubid2container_map: Dict[str, Optional[str]] = dict() + self._container2pubid_map: Dict[str, Optional[str]] = dict() def init_db(self): self.db.executescript(""" @@ -135,6 +149,23 @@ class IssueDB(): self._pubid2container_map[sim_pubid] = None return None + def container2pubid(self, container_ident: str) -> Optional[str]: + if container_ident in self._container2pubid_map: + return self._container2pubid_map[container_ident] + row = list(self.db.execute("SELECT sim_pubid FROM sim_pub WHERE container_ident = ?;", [container_ident])) + if row: + self._container2pubid_map[container_ident] = row[0][0] + return row[0][0] + else: + self._pubid2container_map[container_ident] = None + return None + + def lookup_issue(self, sim_pubid: str, volume: str, issue: str) -> Optional[SimIssueRow]: + row = list(self.db.execute("SELECT * FROM sim_issue WHERE sim_pubid = ? AND volume = ? AND issue = ?;", [sim_pubid, volume, issue])) + if not row: + return None + return SimIssueRow.from_tuple(row[0]) + def load_pubs(self, json_lines: Sequence[str], api: Any): """ Reads a file (or some other iterator) of JSON lines, parses them into a diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py new file mode 100644 index 0000000..7ed0eac --- /dev/null +++ b/fatcat_scholar/work_pipeline.py @@ -0,0 +1,305 @@ + +import io +import sys +import argparse +from pydantic import BaseModel, validator +from typing import List, Dict, Tuple, Optional, Any, Sequence +from fatcat_openapi_client import ReleaseEntity, FileEntity +import internetarchive + +from fatcat_scholar.api_entities import * +from fatcat_scholar.djvu import djvu_extract_leaf_texts +from fatcat_scholar.issue_db import IssueDB, SimIssueRow +from fatcat_scholar.es_transform import es_biblio_from_release, es_release_from_release, DocType + + +def parse_pages(raw: str) -> Tuple[Optional[int], Optional[int]]: + first_raw = raw.split("-")[0] + if not first_raw.isdigit(): + return (None, None) + first = int(first_raw) + if not "-" in raw: + return (first, first) + last_raw = raw.split('-')[-1] + if not last_raw.isdigit(): + return (first, first) + last = int(last_raw) + if last < first: + last_munge = first_raw[0:(len(first_raw)-len(last_raw))] + last_raw + last = int(last_munge) + if last < first: + return (first, first) + return (first, last) + +def test_parse_pages(): + assert parse_pages("479-89") == (479, 489) + assert parse_pages("466-7") == (466, 467) + assert parse_pages("466-501") == (466, 501) + assert parse_pages("466-401") == (466, 466) + # TODO: or should it be (1, 1)? + assert parse_pages("1") == (1, 1) + # TODO: should we be doing strings instead of ints? + assert parse_pages("iiv") == (None, None) + + +class IntermediateBundle(BaseModel): + doc_type: DocType + releases: List[ReleaseEntity] + biblio_release_ident: Optional[str] + grobid_fulltext: Optional[Any] + pdftotext_fulltext: Optional[Any] + sim_fulltext: Optional[Any] + + class Config: + arbitrary_types_allowed = True + json_encoders = { + ReleaseEntity: lambda re: entity_to_dict(re), + } + + +def fulltext_pref_list(releases: List[ReleaseEntity]) -> List[str]: + """ + Returns a list of release idents in preference order (best first) to + try and find fulltext for. + """ + releases_sorted = sorted(releases, reverse=True, key=lambda r: ( + r.release_stage=="updated", + r.release_stage=="published", + r.volume is not None, + r.container_id is not None, + r.ext_ids.pmid is not None, + r.release_stage=="submitted", + r.release_type is not None, + r.release_year, + r.release_date, + r.version, + )) + return [r.ident for r in releases_sorted] + + +class WorkPipeline(): + + def __init__(self, issue_db: IssueDB): + self.issue_db: IssueDB = issue_db + self.ia_client = internetarchive.get_session() + # TODO: postgrest client + # TODO: minio client + + def fetch_file_grobid(self, fe: FileEntity) -> Optional[Any]: + """ + tei_xml: str + release_ident: Optional[str] + file_ident: Optional[str] + """ + return None + + def fetch_file_pdftotext(self, fe: FileEntity) -> Optional[Any]: + """ + raw_text: str + release_ident: Optional[str] + file_ident: Optional[str] + """ + return None + + def lookup_sim(self, release: ReleaseEntity) -> Optional[SimIssueRow]: + """ + Checks in IssueDB to see if this release is likely to have a copy in a + SIM issue item. + + volume + issue + """ + if not (release.container_id and release.volume and release.issue): + return None + sim_pubid = self.issue_db.container2pubid(release.container_id) + if not sim_pubid: + return None + + return self.issue_db.lookup_issue(sim_pubid=sim_pubid, volume=release.volume, issue=release.issue) + + def fetch_sim(self, issue_db_row: SimIssueRow, pages: str, release_ident: str) -> Optional[Any]: + """ + issue_item + pages: str + page_texts: list + page_number + raw_text + release_ident: Optional[str] + pub_item_metadata + issue_item_metadata + """ + + first_page, last_page = parse_pages(pages) + if first_page is None: + return None + + # fetch full metadata from API + issue_meta = self.ia_client.get_metadata(issue_db_row.issue_item) + # XXX: pub_meta = self.ia_client.get_metadata(issue_db_row.pub_collection) + pub_meta = None + + leaf_list = [] + assert 'page_numbers' in issue_meta + for entry in issue_meta['page_numbers'].get('pages', []): + page_num = entry['pageNumber'] + if not (page_num and page_num.isdigit()): + continue + page_num = int(page_num) + if page_num >= first_page and page_num <= last_page: + leaf_list.append(entry['leafNum']) + + if not leaf_list: + return None + + page_texts: List[Dict[str, Any]] = [] + issue_item = self.ia_client.get_item(issue_db_row.issue_item) + issue_item_djvu = issue_item.get_file(issue_db_row.issue_item + "_djvu.xml") + + # override 'close()' method so we can still read out contents + djvu_bytes = io.BytesIO() + djvu_bytes.close = lambda: None # type: ignore + assert issue_item_djvu.download(fileobj=djvu_bytes) == True + djvu_bytes.seek(0) + djvu_xml = io.StringIO(djvu_bytes.read().decode("UTF-8")) + del(djvu_bytes) + + leaf_dict = djvu_extract_leaf_texts(djvu_xml, only_leaves=leaf_list) + + for leaf, raw_text in leaf_dict.items(): + page_texts.append(dict(page_number=leaf, raw_text=raw_text)) + + return dict( + issue_item=issue_db_row.issue_item, + pages=pages, + page_texts=page_texts, + release_ident=release_ident, + pub_item_metadata=pub_meta, + issue_item_metadata=issue_item.metadata, + ) + + def process_release_list(self, releases: List[ReleaseEntity]) -> IntermediateBundle: + """ + 1. find best accessible fatcat file + => fetch GROBID XML if available, else pdftotext if available + => link to thumbnail if available + 2. find best SIM microfilm copy available + """ + pref_idents = fulltext_pref_list(releases) + release_dict = dict([(r.ident, r) for r in releases]) + + #print(f"pref_idents={pref_idents}", file=sys.stderr) + + # find best accessible fatcat file + grobid_fulltext: Optional[Any] + pdftotext_fulltext: Optional[Any] + for ident in pref_idents: + release = release_dict[ident] + if not release.files: + continue + for fe in release.files: + if not fe.sha1 or fe.mimetype not in (None, "application/pdf"): + continue + grobid_fulltext = self.fetch_file_grobid(fe) + pdftotext_fulltext = self.fetch_file_pdftotext(fe) + if grobid_fulltext or pdftotext_fulltext: + break + if grobid_fulltext or pdftotext_fulltext: + break + + # find best accessible SIM metadata and fulltext + sim_fulltext: Optional[Any] = None + sim_issue: Optional[Any] = None + for ident in pref_idents: + release = release_dict[ident] + #print(f"{release.extra}\n{release.pages}", file=sys.stderr) + if not release.pages: + continue + # TODO: in the future, will use release.extra.ia.sim.sim_pubid for lookup + sim_issue = self.lookup_sim(release) + #print(f"release_{release.ident}: sim_issue={sim_issue}", file=sys.stderr) + if not sim_issue: + continue + # XXX: control flow tweak? + sim_fulltext = self.fetch_sim(sim_issue, release.pages, release.ident) + if sim_fulltext: + break + + return IntermediateBundle( + doc_type=DocType.work, + releases=releases, + biblio_release_ident=pref_idents[0], + grobid_fulltext=grobid_fulltext, + pdftotext_fulltext=pdftotext_fulltext, + sim_fulltext=sim_fulltext, + ) + + def run_releases(self, release_stream: Sequence[str]): + """ + Iterates over the stream of releases, which are expected to be grouped + (sorted) by work_ident. + + Collects releases under same work_ident into a batch and processes a + work from that. + + TODO: what is the right API here? stream iterator? how should + parallelism work? + """ + batch = [] + batch_work_id = None + for line in release_stream: + if not line: + continue + release = entity_from_json(line, ReleaseEntity) + if release.work_id == batch_work_id: + batch.append(release) + continue + if batch: + ib = self.process_release_list(batch) + print(ib.json()) + batch_work_id = None + batch = [release,] + batch_work_id = release.work_id + + if batch: + ib = self.process_release_list(batch) + print(ib.json()) + +def main(): + """ + Run this command like: + + python -m fatcat_scholar.release_pipeline + """ + + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + subparsers = parser.add_subparsers() + + parser.add_argument("--issue-db-file", + help="sqlite3 database file to open", + default='data/issue_db.sqlite', + type=str) + + sub = subparsers.add_parser('run_releases', + help="takes expanded release entity JSON, sorted by work_ident") + sub.set_defaults(func='run_releases') + sub.add_argument("json_file", + help="release entities, as JSON-lines", + nargs='?', default=sys.stdin, type=argparse.FileType('r')) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do! (try --help)") + sys.exit(-1) + + idb = IssueDB(args.issue_db_file) + wp = WorkPipeline(issue_db=idb) + + if args.func == 'run_releases': + wp.run_releases(args.json_file) + else: + func = getattr(wp, args.func) + func() + +if __name__=="__main__": + main() -- cgit v1.2.3