diff options
| -rw-r--r-- | fatcat_scholar/issue_db.py | 31 | ||||
| -rw-r--r-- | fatcat_scholar/work_pipeline.py | 305 | ||||
| -rw-r--r-- | tests/test_djvu_parse.py | 4 | 
3 files changed, 338 insertions, 2 deletions
diff --git a/fatcat_scholar/issue_db.py b/fatcat_scholar/issue_db.py index 0d33e17..5278750 100644 --- a/fatcat_scholar/issue_db.py +++ b/fatcat_scholar/issue_db.py @@ -44,6 +44,19 @@ class SimIssueRow:      def tuple(self):          return (self.issue_item, self.sim_pubid, self.year, self.volume, self.issue, self.first_page, self.last_page, self.release_count) +    @classmethod +    def from_tuple(self, row: Any): +        return SimIssueRow( +            issue_item=row[0], +            sim_pubid=row[1], +            year=row[2], +            volume=row[3], +            issue=row[4], +            first_page=row[5], +            last_page=row[6], +            release_count=row[7], +        ) +  @dataclass  class ReleaseCountsRow:      sim_pubid: str @@ -95,6 +108,7 @@ class IssueDB():          """          self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE')          self._pubid2container_map: Dict[str, Optional[str]] = dict() +        self._container2pubid_map: Dict[str, Optional[str]] = dict()      def init_db(self):          self.db.executescript(""" @@ -135,6 +149,23 @@ class IssueDB():              self._pubid2container_map[sim_pubid] = None              return None +    def container2pubid(self, container_ident: str) -> Optional[str]: +        if container_ident in self._container2pubid_map: +            return self._container2pubid_map[container_ident] +        row = list(self.db.execute("SELECT sim_pubid FROM sim_pub WHERE container_ident = ?;", [container_ident])) +        if row: +            self._container2pubid_map[container_ident] = row[0][0] +            return row[0][0] +        else: +            self._pubid2container_map[container_ident] = None +            return None + +    def lookup_issue(self, sim_pubid: str, volume: str, issue: str) -> Optional[SimIssueRow]: +        row = list(self.db.execute("SELECT * FROM sim_issue WHERE sim_pubid = ? AND volume = ? AND issue = ?;", [sim_pubid, volume, issue])) +        if not row: +            return None +        return SimIssueRow.from_tuple(row[0]) +      def load_pubs(self, json_lines: Sequence[str], api: Any):          """          Reads a file (or some other iterator) of JSON lines, parses them into a diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py new file mode 100644 index 0000000..7ed0eac --- /dev/null +++ b/fatcat_scholar/work_pipeline.py @@ -0,0 +1,305 @@ + +import io +import sys +import argparse +from pydantic import BaseModel, validator +from typing import List, Dict, Tuple, Optional, Any, Sequence +from fatcat_openapi_client import ReleaseEntity, FileEntity +import internetarchive + +from fatcat_scholar.api_entities import * +from fatcat_scholar.djvu import djvu_extract_leaf_texts +from fatcat_scholar.issue_db import IssueDB, SimIssueRow +from fatcat_scholar.es_transform import es_biblio_from_release, es_release_from_release, DocType + + +def parse_pages(raw: str) -> Tuple[Optional[int], Optional[int]]: +    first_raw = raw.split("-")[0] +    if not first_raw.isdigit(): +        return (None, None) +    first = int(first_raw) +    if not "-" in raw: +        return (first, first) +    last_raw = raw.split('-')[-1] +    if not last_raw.isdigit(): +        return (first, first) +    last = int(last_raw) +    if last < first: +        last_munge = first_raw[0:(len(first_raw)-len(last_raw))] + last_raw +        last = int(last_munge) +    if last < first: +        return (first, first) +    return (first, last) + +def test_parse_pages(): +    assert parse_pages("479-89") == (479, 489) +    assert parse_pages("466-7") == (466, 467) +    assert parse_pages("466-501") == (466, 501) +    assert parse_pages("466-401") == (466, 466) +    # TODO: or should it be (1, 1)? +    assert parse_pages("1") == (1, 1) +    # TODO: should we be doing strings instead of ints? +    assert parse_pages("iiv") == (None, None) + + +class IntermediateBundle(BaseModel): +    doc_type: DocType +    releases: List[ReleaseEntity] +    biblio_release_ident: Optional[str] +    grobid_fulltext: Optional[Any] +    pdftotext_fulltext: Optional[Any] +    sim_fulltext: Optional[Any] + +    class Config: +        arbitrary_types_allowed = True +        json_encoders = { +            ReleaseEntity: lambda re: entity_to_dict(re), +        } + + +def fulltext_pref_list(releases: List[ReleaseEntity]) -> List[str]: +    """ +    Returns a list of release idents in preference order (best first) to +    try and find fulltext for. +    """ +    releases_sorted = sorted(releases, reverse=True, key=lambda r: ( +        r.release_stage=="updated", +        r.release_stage=="published", +        r.volume is not None, +        r.container_id is not None, +        r.ext_ids.pmid is not None, +        r.release_stage=="submitted", +        r.release_type is not None, +        r.release_year, +        r.release_date, +        r.version, +    )) +    return [r.ident for r in releases_sorted] + + +class WorkPipeline(): + +    def __init__(self, issue_db: IssueDB): +        self.issue_db: IssueDB = issue_db +        self.ia_client = internetarchive.get_session() +        # TODO: postgrest client +        # TODO: minio client + +    def fetch_file_grobid(self, fe: FileEntity) -> Optional[Any]: +        """ +        tei_xml: str +        release_ident: Optional[str] +        file_ident: Optional[str] +        """ +        return None + +    def fetch_file_pdftotext(self, fe: FileEntity) -> Optional[Any]: +        """ +        raw_text: str +        release_ident: Optional[str] +        file_ident: Optional[str] +        """ +        return None + +    def lookup_sim(self, release: ReleaseEntity) -> Optional[SimIssueRow]: +        """ +        Checks in IssueDB to see if this release is likely to have a copy in a +        SIM issue item. + +        volume +        issue +        """ +        if not (release.container_id and release.volume and release.issue): +            return None +        sim_pubid = self.issue_db.container2pubid(release.container_id) +        if not sim_pubid: +            return None + +        return self.issue_db.lookup_issue(sim_pubid=sim_pubid, volume=release.volume, issue=release.issue) + +    def fetch_sim(self, issue_db_row: SimIssueRow, pages: str, release_ident: str) -> Optional[Any]: +        """ +        issue_item  +        pages: str +        page_texts: list +            page_number +            raw_text +        release_ident: Optional[str] +        pub_item_metadata +        issue_item_metadata +        """ + +        first_page, last_page = parse_pages(pages) +        if first_page is None: +            return None + +        # fetch full metadata from API +        issue_meta = self.ia_client.get_metadata(issue_db_row.issue_item) +        # XXX: pub_meta = self.ia_client.get_metadata(issue_db_row.pub_collection) +        pub_meta = None + +        leaf_list = [] +        assert 'page_numbers' in issue_meta +        for entry in issue_meta['page_numbers'].get('pages', []): +            page_num = entry['pageNumber'] +            if not (page_num and page_num.isdigit()): +                continue +            page_num = int(page_num) +            if page_num >= first_page and page_num <= last_page: +                leaf_list.append(entry['leafNum']) + +        if not leaf_list: +            return None + +        page_texts: List[Dict[str, Any]] = [] +        issue_item = self.ia_client.get_item(issue_db_row.issue_item) +        issue_item_djvu = issue_item.get_file(issue_db_row.issue_item + "_djvu.xml") + +        # override 'close()' method so we can still read out contents +        djvu_bytes = io.BytesIO() +        djvu_bytes.close = lambda: None     # type: ignore +        assert issue_item_djvu.download(fileobj=djvu_bytes) == True +        djvu_bytes.seek(0) +        djvu_xml = io.StringIO(djvu_bytes.read().decode("UTF-8")) +        del(djvu_bytes) + +        leaf_dict = djvu_extract_leaf_texts(djvu_xml, only_leaves=leaf_list) + +        for leaf, raw_text in leaf_dict.items(): +            page_texts.append(dict(page_number=leaf, raw_text=raw_text)) + +        return dict( +            issue_item=issue_db_row.issue_item, +            pages=pages, +            page_texts=page_texts, +            release_ident=release_ident, +            pub_item_metadata=pub_meta, +            issue_item_metadata=issue_item.metadata, +        ) + +    def process_release_list(self, releases: List[ReleaseEntity]) -> IntermediateBundle: +        """ +        1. find best accessible fatcat file +            => fetch GROBID XML if available, else pdftotext if available +            => link to thumbnail if available +        2. find best SIM microfilm copy available +        """ +        pref_idents = fulltext_pref_list(releases) +        release_dict = dict([(r.ident, r) for r in releases]) + +        #print(f"pref_idents={pref_idents}", file=sys.stderr) + +        # find best accessible fatcat file +        grobid_fulltext: Optional[Any] +        pdftotext_fulltext: Optional[Any] +        for ident in pref_idents: +            release = release_dict[ident] +            if not release.files: +                continue +            for fe in release.files: +                if not fe.sha1 or fe.mimetype not in (None, "application/pdf"): +                    continue +                grobid_fulltext = self.fetch_file_grobid(fe) +                pdftotext_fulltext = self.fetch_file_pdftotext(fe) +                if grobid_fulltext or pdftotext_fulltext: +                    break +            if grobid_fulltext or pdftotext_fulltext: +                break + +        # find best accessible SIM metadata and fulltext +        sim_fulltext: Optional[Any] = None +        sim_issue: Optional[Any] = None +        for ident in pref_idents: +            release = release_dict[ident] +            #print(f"{release.extra}\n{release.pages}", file=sys.stderr) +            if not release.pages: +                continue +            # TODO: in the future, will use release.extra.ia.sim.sim_pubid for lookup +            sim_issue = self.lookup_sim(release) +            #print(f"release_{release.ident}: sim_issue={sim_issue}", file=sys.stderr) +            if not sim_issue: +                continue +            # XXX: control flow tweak? +            sim_fulltext = self.fetch_sim(sim_issue, release.pages, release.ident) +            if sim_fulltext: +                break + +        return IntermediateBundle( +            doc_type=DocType.work, +            releases=releases, +            biblio_release_ident=pref_idents[0], +            grobid_fulltext=grobid_fulltext, +            pdftotext_fulltext=pdftotext_fulltext, +            sim_fulltext=sim_fulltext, +        ) + +    def run_releases(self, release_stream: Sequence[str]): +        """ +        Iterates over the stream of releases, which are expected to be grouped +        (sorted) by work_ident. + +        Collects releases under same work_ident into a batch and processes a +        work from that. + +        TODO: what is the right API here? stream iterator? how should +        parallelism work? +        """ +        batch = [] +        batch_work_id = None +        for line in release_stream: +            if not line: +                continue +            release = entity_from_json(line, ReleaseEntity) +            if release.work_id == batch_work_id: +                batch.append(release) +                continue +            if batch: +                ib = self.process_release_list(batch) +                print(ib.json()) +                batch_work_id = None +            batch = [release,] +            batch_work_id = release.work_id + +        if batch: +            ib = self.process_release_list(batch) +            print(ib.json()) + +def main(): +    """ +    Run this command like: + +        python -m fatcat_scholar.release_pipeline +    """ + +    parser = argparse.ArgumentParser( +        formatter_class=argparse.ArgumentDefaultsHelpFormatter) +    subparsers = parser.add_subparsers() + +    parser.add_argument("--issue-db-file", +        help="sqlite3 database file to open", +        default='data/issue_db.sqlite', +        type=str) + +    sub = subparsers.add_parser('run_releases', +        help="takes expanded release entity JSON, sorted by work_ident") +    sub.set_defaults(func='run_releases') +    sub.add_argument("json_file", +        help="release entities, as JSON-lines", +        nargs='?', default=sys.stdin, type=argparse.FileType('r')) + +    args = parser.parse_args() +    if not args.__dict__.get("func"): +        print("tell me what to do! (try --help)") +        sys.exit(-1) + +    idb = IssueDB(args.issue_db_file) +    wp = WorkPipeline(issue_db=idb) + +    if args.func == 'run_releases': +        wp.run_releases(args.json_file) +    else: +        func = getattr(wp, args.func) +        func() + +if __name__=="__main__": +    main() diff --git a/tests/test_djvu_parse.py b/tests/test_djvu_parse.py index 66c2804..abf5a4c 100644 --- a/tests/test_djvu_parse.py +++ b/tests/test_djvu_parse.py @@ -1,5 +1,5 @@ -from io import StringIO +import io  from fatcat_scholar.djvu import djvu_extract_leaf_texts @@ -9,7 +9,7 @@ def test_djvu_extract_leaf_texts():      with open('tests/files/ERIC_ED441501_djvu.xml', 'r') as f:          blob = f.read() -    leaves = djvu_extract_leaf_texts(StringIO(blob), [3,6]) +    leaves = djvu_extract_leaf_texts(io.StringIO(blob), [3,6])      assert 3 in leaves      assert 6 in leaves      assert "2. Original cataloging tools" in leaves[3]  | 
