diff options
Diffstat (limited to 'python/sandcrawler/pdftrio.py')
-rw-r--r-- | python/sandcrawler/pdftrio.py | 185 |
1 files changed, 65 insertions, 120 deletions
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py index 41eed19..112df6a 100644 --- a/python/sandcrawler/pdftrio.py +++ b/python/sandcrawler/pdftrio.py @@ -1,19 +1,19 @@ - import time +from typing import Any, Dict, Optional + import requests -from .workers import SandcrawlerWorker +from .ia import WaybackClient from .misc import gen_file_metadata, requests_retry_session -from .ia import WaybackClient, WaybackError, PetaboxError +from .workers import SandcrawlerFetchWorker, SandcrawlerWorker class PdfTrioClient(object): - - def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs): + def __init__(self, host_url: str = "http://pdftrio.qa.fatcat.wiki", **kwargs): self.host_url = host_url self.http_session = requests_retry_session(retries=3, backoff_factor=3) - def classify_pdf(self, blob, mode="auto"): + def classify_pdf(self, blob: bytes, mode: str = "auto") -> Dict[str, Any]: """ Returns a dict with at least: @@ -26,172 +26,117 @@ class PdfTrioClient(object): appropriately; an optional `error_msg` may also be set. For some other errors, like connection failure, an exception is raised. """ - assert blob + assert blob and type(blob) == bytes try: - pdftrio_response = requests.post( + pdftrio_response = self.http_session.post( self.host_url + "/classify/research-pub/" + mode, files={ - 'pdf_content': blob, + "pdf_content": blob, }, timeout=60.0, ) except requests.Timeout: return { - 'status': 'error-timeout', - 'status_code': -4, # heritrix3 "HTTP timeout" code - 'error_msg': 'pdftrio request (HTTP POST) timeout', + "status": "error-timeout", + "status_code": -4, # heritrix3 "HTTP timeout" code + "error_msg": "pdftrio request (HTTP POST) timeout", } except requests.exceptions.ConnectionError: # crude back-off time.sleep(2.0) return { - 'status': 'error-connect', - 'status_code': -2, # heritrix3 "HTTP connect" code - 'error_msg': 'pdftrio request connection timout', + "status": "error-connect", + "status_code": -2, # heritrix3 "HTTP connect" code + "error_msg": "pdftrio request connection timeout", } - info = dict( - status_code=pdftrio_response.status_code, - ) + info: Dict[str, Any] = dict(status_code=pdftrio_response.status_code) if pdftrio_response.status_code == 200: resp_json = pdftrio_response.json() - assert 'ensemble_score' in resp_json - assert 'status' in resp_json - assert 'versions' in resp_json + assert "ensemble_score" in resp_json + assert "status" in resp_json + assert "versions" in resp_json info.update(resp_json) else: - info['status'] = 'error' + info["status"] = "error" # TODO: might return JSON with some info? - info['_total_sec'] = pdftrio_response.elapsed.total_seconds() + info["_total_sec"] = pdftrio_response.elapsed.total_seconds() return info -class PdfTrioWorker(SandcrawlerWorker): +class PdfTrioWorker(SandcrawlerFetchWorker): """ This class is basically copied directly from GrobidWorker """ - def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs): - super().__init__() + def __init__( + self, + pdftrio_client: PdfTrioClient, + wayback_client: Optional[WaybackClient] = None, + sink: Optional[SandcrawlerWorker] = None, + **kwargs + ): + super().__init__(wayback_client=wayback_client, **kwargs) self.pdftrio_client = pdftrio_client - self.wayback_client = wayback_client self.sink = sink - def process(self, record): + def process(self, record: Any, key: Optional[str] = None) -> Any: start_process = time.time() - default_key = record['sha1hex'] - wayback_sec = None - petabox_sec = None - if record.get('warc_path') and record.get('warc_offset'): - # it's a full CDX dict. fetch using WaybackClient - if not self.wayback_client: - raise Exception("wayback client not configured for this PdfTrioWorker") - try: - start = time.time() - blob = self.wayback_client.fetch_petabox_body( - csize=record['warc_csize'], - offset=record['warc_offset'], - warc_path=record['warc_path'], - ) - wayback_sec = time.time() - start - except (WaybackError, PetaboxError) as we: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error-wayback", - error_msg=str(we), - ), - ) - elif record.get('url') and record.get('datetime'): - # it's a partial CDX dict or something? fetch using WaybackClient - if not self.wayback_client: - raise Exception("wayback client not configured for this PdfTrioWorker") - try: - start = time.time() - blob = self.wayback_client.fetch_replay_body( - url=record['url'], - datetime=record['datetime'], - ) - wayback_sec = time.time() - start - except WaybackError as we: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error-wayback", - error_msg=str(we), - ), - ) - elif record.get('item') and record.get('path'): - # it's petabox link; fetch via HTTP - start = time.time() - resp = requests.get("https://archive.org/serve/{}/{}".format( - record['item'], record['path'])) - petabox_sec = time.time() - start - try: - resp.raise_for_status() - except Exception as e: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error-petabox", - error_msg=str(e), - ), - ) - blob = resp.content - else: - raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") - if not blob: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error", - error_msg="empty blob", - ), - ) + fetch_sec = None + + start = time.time() + fetch_result = self.fetch_blob(record) + fetch_sec = time.time() - start + if fetch_result["status"] != "success": + return fetch_result + blob: bytes = fetch_result["blob"] + assert blob and isinstance(blob, bytes) + result = dict() - result['file_meta'] = gen_file_metadata(blob) - result['key'] = result['file_meta']['sha1hex'] - result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob) - result['source'] = record - result['timing'] = dict( - pdftrio_sec=result['pdf_trio'].pop('_total_sec', None), + result["file_meta"] = gen_file_metadata(blob) + result["key"] = result["file_meta"]["sha1hex"] + result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob) + result["source"] = record + result["timing"] = dict( + pdftrio_sec=result["pdf_trio"].pop("_total_sec", None), total_sec=time.time() - start_process, ) - if wayback_sec: - result['timing']['wayback_sec'] = wayback_sec - if petabox_sec: - result['timing']['petabox_sec'] = wayback_sec + if fetch_sec: + result["timing"]["fetch_sec"] = fetch_sec return result + class PdfTrioBlobWorker(SandcrawlerWorker): """ This is sort of like PdfTrioWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs): - super().__init__() + def __init__( + self, + pdftrio_client: PdfTrioClient, + sink: Optional[SandcrawlerWorker] = None, + mode: str = "auto", + **kwargs + ): + super().__init__(**kwargs) self.pdftrio_client = pdftrio_client self.sink = sink self.mode = mode - def process(self, blob): + def process(self, blob: Any, key: Optional[str] = None) -> Any: start_process = time.time() if not blob: return None + assert isinstance(blob, bytes) result = dict() - result['file_meta'] = gen_file_metadata(blob) - result['key'] = result['file_meta']['sha1hex'] - result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob, mode=self.mode) - result['timing'] = dict( - pdftrio_sec=result['pdf_trio'].pop('_total_sec', None), + result["file_meta"] = gen_file_metadata(blob) + result["key"] = result["file_meta"]["sha1hex"] + result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob, mode=self.mode) + result["timing"] = dict( + pdftrio_sec=result["pdf_trio"].pop("_total_sec", None), total_sec=time.time() - start_process, ) return result - |