aboutsummaryrefslogtreecommitdiffstats
path: root/fatcat_scholar/work_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'fatcat_scholar/work_pipeline.py')
-rw-r--r--fatcat_scholar/work_pipeline.py188
1 files changed, 120 insertions, 68 deletions
diff --git a/fatcat_scholar/work_pipeline.py b/fatcat_scholar/work_pipeline.py
index 46e40e1..af558a3 100644
--- a/fatcat_scholar/work_pipeline.py
+++ b/fatcat_scholar/work_pipeline.py
@@ -1,4 +1,3 @@
-
import os
import io
import sys
@@ -12,9 +11,17 @@ import internetarchive
from fatcat_scholar.api_entities import *
from fatcat_scholar.djvu import djvu_extract_leaf_texts
-from fatcat_scholar.sandcrawler import SandcrawlerPostgrestClient, SandcrawlerMinioClient
+from fatcat_scholar.sandcrawler import (
+ SandcrawlerPostgrestClient,
+ SandcrawlerMinioClient,
+)
from fatcat_scholar.issue_db import IssueDB, SimIssueRow, SimPubRow
-from fatcat_scholar.schema import es_biblio_from_release, es_release_from_release, DocType, IntermediateBundle
+from fatcat_scholar.schema import (
+ es_biblio_from_release,
+ es_release_from_release,
+ DocType,
+ IntermediateBundle,
+)
from fatcat_scholar.sim_pipeline import truncate_pub_meta, truncate_issue_meta
@@ -25,17 +32,18 @@ def parse_pages(raw: str) -> Tuple[Optional[int], Optional[int]]:
first = int(first_raw)
if not "-" in raw:
return (first, first)
- last_raw = raw.split('-')[-1]
+ last_raw = raw.split("-")[-1]
if not last_raw.isdigit():
return (first, first)
last = int(last_raw)
if last < first:
- last_munge = first_raw[0:(len(first_raw)-len(last_raw))] + last_raw
+ last_munge = first_raw[0 : (len(first_raw) - len(last_raw))] + last_raw
last = int(last_munge)
if last < first:
return (first, first)
return (first, last)
+
def test_parse_pages():
assert parse_pages("479-89") == (479, 489)
assert parse_pages("466-7") == (466, 467)
@@ -52,24 +60,33 @@ def fulltext_pref_list(releases: List[ReleaseEntity]) -> List[str]:
Returns a list of release idents in preference order (best first) to
try and find fulltext for.
"""
- releases_sorted = sorted(releases, reverse=True, key=lambda r: (
- r.release_stage=="updated",
- r.release_stage=="published",
- r.volume is not None,
- r.container_id is not None,
- r.ext_ids.pmid is not None,
- r.release_stage=="submitted",
- r.release_type is not None,
- r.release_year,
- r.release_date,
- r.version,
- ))
+ releases_sorted = sorted(
+ releases,
+ reverse=True,
+ key=lambda r: (
+ r.release_stage == "updated",
+ r.release_stage == "published",
+ r.volume is not None,
+ r.container_id is not None,
+ r.ext_ids.pmid is not None,
+ r.release_stage == "submitted",
+ r.release_type is not None,
+ r.release_year,
+ r.release_date,
+ r.version,
+ ),
+ )
return [r.ident for r in releases_sorted]
-class WorkPipeline():
-
- def __init__(self, issue_db: IssueDB, sandcrawler_db_client: SandcrawlerPostgrestClient, sandcrawler_s3_client: SandcrawlerMinioClient, fulltext_cache_dir=None):
+class WorkPipeline:
+ def __init__(
+ self,
+ issue_db: IssueDB,
+ sandcrawler_db_client: SandcrawlerPostgrestClient,
+ sandcrawler_s3_client: SandcrawlerMinioClient,
+ fulltext_cache_dir=None,
+ ):
self.issue_db: IssueDB = issue_db
self.ia_client = internetarchive.get_session()
self.sandcrawler_db_client = sandcrawler_db_client
@@ -87,9 +104,9 @@ class WorkPipeline():
if not fe.urls:
return None
grobid_meta = self.sandcrawler_db_client.get_grobid(fe.sha1)
- if not grobid_meta or grobid_meta['status'] != 'success':
+ if not grobid_meta or grobid_meta["status"] != "success":
return None
- #print(grobid_meta)
+ # print(grobid_meta)
try:
grobid_xml = self.sandcrawler_s3_client.get_blob(
folder="grobid",
@@ -98,13 +115,11 @@ class WorkPipeline():
prefix="",
bucket="sandcrawler",
)
- #print(grobid_xml)
+ # print(grobid_xml)
except minio.error.NoSuchKey:
return None
return dict(
- tei_xml=grobid_xml,
- release_ident=release_ident,
- file_ident=fe.ident,
+ tei_xml=grobid_xml, release_ident=release_ident, file_ident=fe.ident,
)
def fetch_file_pdftotext(self, fe: FileEntity, release_ident: str) -> Optional[Any]:
@@ -115,14 +130,14 @@ class WorkPipeline():
"""
# HACK: look for local pdftotext output
if self.fulltext_cache_dir:
- local_txt_path = f"{self.fulltext_cache_dir}/pdftotext/{fe.sha1[:2]}/{fe.sha1}.txt"
+ local_txt_path = (
+ f"{self.fulltext_cache_dir}/pdftotext/{fe.sha1[:2]}/{fe.sha1}.txt"
+ )
try:
- with open(local_txt_path, 'r') as txt_file:
+ with open(local_txt_path, "r") as txt_file:
raw_text = txt_file.read()
return dict(
- raw_text=raw_text,
- release_ident=release_ident,
- file_ident=fe.ident,
+ raw_text=raw_text, release_ident=release_ident, file_ident=fe.ident,
)
except FileNotFoundError:
pass
@@ -144,9 +159,17 @@ class WorkPipeline():
if not sim_pubid:
return None
- return self.issue_db.lookup_issue(sim_pubid=sim_pubid, volume=release.volume, issue=release.issue)
+ return self.issue_db.lookup_issue(
+ sim_pubid=sim_pubid, volume=release.volume, issue=release.issue
+ )
- def fetch_sim(self, issue_db_row: SimIssueRow, issue_db_pub_row: SimPubRow, pages: str, release_ident: str) -> Optional[Any]:
+ def fetch_sim(
+ self,
+ issue_db_row: SimIssueRow,
+ issue_db_pub_row: SimPubRow,
+ pages: str,
+ release_ident: str,
+ ) -> Optional[Any]:
"""
issue_item
pages: str
@@ -169,17 +192,17 @@ class WorkPipeline():
leaf_index = dict()
leaf_list = []
- if not 'page_numbers' in issue_meta:
+ if not "page_numbers" in issue_meta:
# TODO: warn
return None
- for entry in issue_meta['page_numbers'].get('pages', []):
- page_num = entry['pageNumber']
- leaf_index[entry['leafNum']] = page_num
+ for entry in issue_meta["page_numbers"].get("pages", []):
+ page_num = entry["pageNumber"]
+ leaf_index[entry["leafNum"]] = page_num
if not (page_num and page_num.isdigit()):
continue
page_num = int(page_num)
if page_num >= first_page and page_num <= last_page:
- leaf_list.append(entry['leafNum'])
+ leaf_list.append(entry["leafNum"])
if not leaf_list:
return None
@@ -190,16 +213,22 @@ class WorkPipeline():
# override 'close()' method so we can still read out contents
djvu_bytes = io.BytesIO()
- djvu_bytes.close = lambda: None # type: ignore
+ djvu_bytes.close = lambda: None # type: ignore
assert issue_item_djvu.download(fileobj=djvu_bytes) == True
djvu_bytes.seek(0)
djvu_xml = io.StringIO(djvu_bytes.read().decode("UTF-8"))
- del(djvu_bytes)
+ del djvu_bytes
leaf_dict = djvu_extract_leaf_texts(djvu_xml, only_leaves=leaf_list)
for leaf_num, raw_text in leaf_dict.items():
- page_texts.append(dict(page_num=leaf_index.get(leaf_num), leaf_num=leaf_num, raw_text=raw_text))
+ page_texts.append(
+ dict(
+ page_num=leaf_index.get(leaf_num),
+ leaf_num=leaf_num,
+ raw_text=raw_text,
+ )
+ )
return dict(
issue_item=issue_db_row.issue_item,
@@ -220,7 +249,7 @@ class WorkPipeline():
pref_idents = fulltext_pref_list(releases)
release_dict = dict([(r.ident, r) for r in releases])
- #print(f"pref_idents={pref_idents}", file=sys.stderr)
+ # print(f"pref_idents={pref_idents}", file=sys.stderr)
# find best accessible fatcat file
grobid_fulltext: Optional[Any] = None
@@ -244,12 +273,12 @@ class WorkPipeline():
sim_issue: Optional[Any] = None
for ident in pref_idents:
release = release_dict[ident]
- #print(f"{release.extra}\n{release.pages}", file=sys.stderr)
+ # print(f"{release.extra}\n{release.pages}", file=sys.stderr)
if not release.pages:
continue
# TODO: in the future, will use release.extra.ia.sim.sim_pubid for lookup
sim_issue = self.lookup_sim(release)
- #print(f"release_{release.ident}: sim_issue={sim_issue}", file=sys.stderr)
+ # print(f"release_{release.ident}: sim_issue={sim_issue}", file=sys.stderr)
if not sim_issue:
continue
sim_pub = self.issue_db.lookup_pub(sim_issue.sim_pubid)
@@ -257,7 +286,9 @@ class WorkPipeline():
continue
# XXX: control flow tweak?
try:
- sim_fulltext = self.fetch_sim(sim_issue, sim_pub, release.pages, release.ident)
+ sim_fulltext = self.fetch_sim(
+ sim_issue, sim_pub, release.pages, release.ident
+ )
except requests.exceptions.ConnectionError as e:
print(str(e), file=sys.stderr)
continue
@@ -300,13 +331,16 @@ class WorkPipeline():
ib = self.process_release_list(batch)
print(ib.json())
batch_work_id = None
- batch = [release,]
+ batch = [
+ release,
+ ]
batch_work_id = release.work_id
if batch:
ib = self.process_release_list(batch)
print(ib.json())
+
def main():
"""
Run this command like:
@@ -315,31 +349,46 @@ def main():
"""
parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ )
subparsers = parser.add_subparsers()
- parser.add_argument("--issue-db-file",
+ parser.add_argument(
+ "--issue-db-file",
help="sqlite3 database file to open",
- default='data/issue_db.sqlite',
- type=str)
- parser.add_argument("--sandcrawler-db-api",
+ default="data/issue_db.sqlite",
+ type=str,
+ )
+ parser.add_argument(
+ "--sandcrawler-db-api",
help="Sandcrawler Postgrest API endpoint",
- default='http://aitio.us.archive.org:3030',
- type=str)
- parser.add_argument("--sandcrawler-s3-api",
+ default="http://aitio.us.archive.org:3030",
+ type=str,
+ )
+ parser.add_argument(
+ "--sandcrawler-s3-api",
help="Sandcrawler S3 (minio/seaweedfs) API endpoint",
- default='aitio.us.archive.org:9000',
- type=str)
+ default="aitio.us.archive.org:9000",
+ type=str,
+ )
- sub = subparsers.add_parser('run_releases',
- help="takes expanded release entity JSON, sorted by work_ident")
- sub.set_defaults(func='run_releases')
- sub.add_argument("json_file",
+ sub = subparsers.add_parser(
+ "run_releases", help="takes expanded release entity JSON, sorted by work_ident"
+ )
+ sub.set_defaults(func="run_releases")
+ sub.add_argument(
+ "json_file",
help="release entities, as JSON-lines",
- nargs='?', default=sys.stdin, type=argparse.FileType('r'))
- sub.add_argument("--fulltext-cache-dir",
+ nargs="?",
+ default=sys.stdin,
+ type=argparse.FileType("r"),
+ )
+ sub.add_argument(
+ "--fulltext-cache-dir",
help="path of local directory with pdftotext fulltext (and thumbnails)",
- default=None, type=str)
+ default=None,
+ type=str,
+ )
args = parser.parse_args()
if not args.__dict__.get("func"):
@@ -348,20 +397,23 @@ def main():
wp = WorkPipeline(
issue_db=IssueDB(args.issue_db_file),
- sandcrawler_db_client=SandcrawlerPostgrestClient(api_url=args.sandcrawler_db_api),
+ sandcrawler_db_client=SandcrawlerPostgrestClient(
+ api_url=args.sandcrawler_db_api
+ ),
sandcrawler_s3_client=SandcrawlerMinioClient(
host_url=args.sandcrawler_s3_api,
- access_key=os.environ.get('MINIO_ACCESS_KEY'),
- secret_key=os.environ.get('MINIO_SECRET_KEY'),
+ access_key=os.environ.get("MINIO_ACCESS_KEY"),
+ secret_key=os.environ.get("MINIO_SECRET_KEY"),
),
fulltext_cache_dir=args.fulltext_cache_dir,
)
- if args.func == 'run_releases':
+ if args.func == "run_releases":
wp.run_releases(args.json_file)
else:
func = getattr(wp, args.func)
func()
-if __name__=="__main__":
+
+if __name__ == "__main__":
main()