import os import io import sys import sqlite3 import argparse import requests from pydantic import BaseModel, validator from typing import List, Dict, Tuple, Optional, Any, Sequence from fatcat_openapi_client import ReleaseEntity, FileEntity import internetarchive from fatcat_scholar.api_entities import * from fatcat_scholar.djvu import djvu_extract_leaf_texts from fatcat_scholar.sandcrawler import SandcrawlerPostgrestClient, SandcrawlerMinioClient from fatcat_scholar.issue_db import IssueDB, SimIssueRow from fatcat_scholar.schema import es_biblio_from_release, es_release_from_release, DocType, IntermediateBundle def truncate_pub_meta(full: Dict[str, Any]) -> Dict[str, Any]: """ Takes a complete archive.org metadata dictionary for a publication collection, and simplifies it by removing fields. Motivation is to make intermediate bundle files smaller. """ full.pop('files') if 'ulrichs' in full and full['ulrichs']: full['ulrichs'][0].pop('reviews_mfl') full['ulrichs'][0].pop('editorial_description') # these are interesting, but just too long full['ulrichs'][0].pop('online_availability_full_text') full['ulrichs'][0].pop('abstracting_indexing') full['ulrichs'][0].pop('publisher_and_ordering_details') return full def truncate_issue_meta(full: Dict[str, Any]) -> Dict[str, Any]: """ Same as truncate_pub_meta() but for issue item metadata """ full.pop('files') return full class SimPipeline(): def __init__(self, issue_db: IssueDB): self.issue_db: IssueDB = issue_db self.ia_client = internetarchive.get_session() def fetch_sim_issue(self, issue_db_row: Any) -> Optional[Any]: """ issue_item pages: str page_texts: list raw_text page_num leaf_num release_ident: Optional[str] pub_item_metadata issue_item_metadata """ # fetch full metadata from API issue_meta = self.ia_client.get_metadata(issue_db_row['issue_item']) pub_meta = self.ia_client.get_metadata(issue_db_row['pub_collection']) leaf_index = dict() leaf_list = [] if not 'page_numbers' in issue_meta: # TODO: warn return None for entry in issue_meta['page_numbers'].get('pages', []): page_num = entry['pageNumber'] leaf_index[entry['leafNum']] = page_num if not (page_num and page_num.isdigit()): continue page_num = int(page_num) leaf_list.append(entry['leafNum']) if not leaf_list: return None page_texts: List[Dict[str, Any]] = [] issue_item = self.ia_client.get_item(issue_db_row['issue_item']) issue_item_djvu = issue_item.get_file(issue_db_row['issue_item'] + "_djvu.xml") # override 'close()' method so we can still read out contents djvu_bytes = io.BytesIO() djvu_bytes.close = lambda: None # type: ignore assert issue_item_djvu.download(fileobj=djvu_bytes) == True djvu_bytes.seek(0) djvu_xml = io.StringIO(djvu_bytes.read().decode("UTF-8")) del(djvu_bytes) leaf_dict = djvu_extract_leaf_texts(djvu_xml) for leaf_num, raw_text in leaf_dict.items(): page_texts.append(dict(page_num=leaf_index.get(leaf_num), leaf_num=leaf_num, raw_text=raw_text)) return dict( issue_item=issue_db_row['issue_item'], pages=None, page_texts=page_texts, release_ident=None, pub_item_metadata=truncate_pub_meta(pub_meta), issue_item_metadata=truncate_issue_meta(issue_meta), ) def run_issue_db(self, limit: int = None): count = 0 self.issue_db.db.row_factory = sqlite3.Row cur = self.issue_db.db.cursor() for row in cur.execute('SELECT * FROM sim_issue LEFT JOIN sim_pub ON sim_issue.sim_pubid = sim_pub.sim_pubid WHERE sim_issue.release_count < 3'): # filter out "contents" and "index" items # TODO: more filters; also redundant with IssueDB code? if row['issue_item'].endswith('_contents') or row['issue_item'].endswith('_index'): continue try: full_issue = self.fetch_sim_issue(row) except requests.exceptions.ReadTimeout as e: print(str(e), file=sys.stderr) continue if not full_issue: continue for leaf in full_issue['page_texts']: bundle = IntermediateBundle( doc_type=DocType.sim_page, releases=[], biblio_release_ident=None, grobid_fulltext=None, pdftotext_fulltext=None, sim_fulltext=dict( issue_item=full_issue['issue_item'], pages=str(leaf['page_num']), page_texts=[leaf], release_ident=None, pub_item_metadata=full_issue['pub_item_metadata'], issue_item_metadata=full_issue['issue_item_metadata'], ) ) print(bundle.json()) count += 1 if limit is not None and count >= limit: break if limit is not None and count >= limit: break def main(): """ Run this command like: python -m fatcat_scholar.sim_pipeline """ parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparsers = parser.add_subparsers() parser.add_argument("--issue-db-file", help="sqlite3 database file to open", default='data/issue_db.sqlite', type=str) sub = subparsers.add_parser('run_issue_db', help="iterates through entire IssueDB") sub.set_defaults(func='run_issue_db') sub.add_argument("--limit", help="maximum number of pages to index", type=int) args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do! (try --help)") sys.exit(-1) sp = SimPipeline(issue_db=IssueDB(args.issue_db_file)) if args.func == 'run_issue_db': sp.run_issue_db(limit=args.limit) else: func = getattr(sp, args.func) func() if __name__=="__main__": main()