aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/pdftrio.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/pdftrio.py')
-rw-r--r--python/sandcrawler/pdftrio.py185
1 files changed, 65 insertions, 120 deletions
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index 41eed19..112df6a 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -1,19 +1,19 @@
-
import time
+from typing import Any, Dict, Optional
+
import requests
-from .workers import SandcrawlerWorker
+from .ia import WaybackClient
from .misc import gen_file_metadata, requests_retry_session
-from .ia import WaybackClient, WaybackError, PetaboxError
+from .workers import SandcrawlerFetchWorker, SandcrawlerWorker
class PdfTrioClient(object):
-
- def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs):
+ def __init__(self, host_url: str = "http://pdftrio.qa.fatcat.wiki", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
- def classify_pdf(self, blob, mode="auto"):
+ def classify_pdf(self, blob: bytes, mode: str = "auto") -> Dict[str, Any]:
"""
Returns a dict with at least:
@@ -26,172 +26,117 @@ class PdfTrioClient(object):
appropriately; an optional `error_msg` may also be set. For some other
errors, like connection failure, an exception is raised.
"""
- assert blob
+ assert blob and type(blob) == bytes
try:
- pdftrio_response = requests.post(
+ pdftrio_response = self.http_session.post(
self.host_url + "/classify/research-pub/" + mode,
files={
- 'pdf_content': blob,
+ "pdf_content": blob,
},
timeout=60.0,
)
except requests.Timeout:
return {
- 'status': 'error-timeout',
- 'status_code': -4, # heritrix3 "HTTP timeout" code
- 'error_msg': 'pdftrio request (HTTP POST) timeout',
+ "status": "error-timeout",
+ "status_code": -4, # heritrix3 "HTTP timeout" code
+ "error_msg": "pdftrio request (HTTP POST) timeout",
}
except requests.exceptions.ConnectionError:
# crude back-off
time.sleep(2.0)
return {
- 'status': 'error-connect',
- 'status_code': -2, # heritrix3 "HTTP connect" code
- 'error_msg': 'pdftrio request connection timout',
+ "status": "error-connect",
+ "status_code": -2, # heritrix3 "HTTP connect" code
+ "error_msg": "pdftrio request connection timeout",
}
- info = dict(
- status_code=pdftrio_response.status_code,
- )
+ info: Dict[str, Any] = dict(status_code=pdftrio_response.status_code)
if pdftrio_response.status_code == 200:
resp_json = pdftrio_response.json()
- assert 'ensemble_score' in resp_json
- assert 'status' in resp_json
- assert 'versions' in resp_json
+ assert "ensemble_score" in resp_json
+ assert "status" in resp_json
+ assert "versions" in resp_json
info.update(resp_json)
else:
- info['status'] = 'error'
+ info["status"] = "error"
# TODO: might return JSON with some info?
- info['_total_sec'] = pdftrio_response.elapsed.total_seconds()
+ info["_total_sec"] = pdftrio_response.elapsed.total_seconds()
return info
-class PdfTrioWorker(SandcrawlerWorker):
+class PdfTrioWorker(SandcrawlerFetchWorker):
"""
This class is basically copied directly from GrobidWorker
"""
- def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs):
- super().__init__()
+ def __init__(
+ self,
+ pdftrio_client: PdfTrioClient,
+ wayback_client: Optional[WaybackClient] = None,
+ sink: Optional[SandcrawlerWorker] = None,
+ **kwargs
+ ):
+ super().__init__(wayback_client=wayback_client, **kwargs)
self.pdftrio_client = pdftrio_client
- self.wayback_client = wayback_client
self.sink = sink
- def process(self, record):
+ def process(self, record: Any, key: Optional[str] = None) -> Any:
start_process = time.time()
- default_key = record['sha1hex']
- wayback_sec = None
- petabox_sec = None
- if record.get('warc_path') and record.get('warc_offset'):
- # it's a full CDX dict. fetch using WaybackClient
- if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
- try:
- start = time.time()
- blob = self.wayback_client.fetch_petabox_body(
- csize=record['warc_csize'],
- offset=record['warc_offset'],
- warc_path=record['warc_path'],
- )
- wayback_sec = time.time() - start
- except (WaybackError, PetaboxError) as we:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error-wayback",
- error_msg=str(we),
- ),
- )
- elif record.get('url') and record.get('datetime'):
- # it's a partial CDX dict or something? fetch using WaybackClient
- if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
- try:
- start = time.time()
- blob = self.wayback_client.fetch_replay_body(
- url=record['url'],
- datetime=record['datetime'],
- )
- wayback_sec = time.time() - start
- except WaybackError as we:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error-wayback",
- error_msg=str(we),
- ),
- )
- elif record.get('item') and record.get('path'):
- # it's petabox link; fetch via HTTP
- start = time.time()
- resp = requests.get("https://archive.org/serve/{}/{}".format(
- record['item'], record['path']))
- petabox_sec = time.time() - start
- try:
- resp.raise_for_status()
- except Exception as e:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error-petabox",
- error_msg=str(e),
- ),
- )
- blob = resp.content
- else:
- raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
- if not blob:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error",
- error_msg="empty blob",
- ),
- )
+ fetch_sec = None
+
+ start = time.time()
+ fetch_result = self.fetch_blob(record)
+ fetch_sec = time.time() - start
+ if fetch_result["status"] != "success":
+ return fetch_result
+ blob: bytes = fetch_result["blob"]
+ assert blob and isinstance(blob, bytes)
+
result = dict()
- result['file_meta'] = gen_file_metadata(blob)
- result['key'] = result['file_meta']['sha1hex']
- result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob)
- result['source'] = record
- result['timing'] = dict(
- pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
+ result["file_meta"] = gen_file_metadata(blob)
+ result["key"] = result["file_meta"]["sha1hex"]
+ result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob)
+ result["source"] = record
+ result["timing"] = dict(
+ pdftrio_sec=result["pdf_trio"].pop("_total_sec", None),
total_sec=time.time() - start_process,
)
- if wayback_sec:
- result['timing']['wayback_sec'] = wayback_sec
- if petabox_sec:
- result['timing']['petabox_sec'] = wayback_sec
+ if fetch_sec:
+ result["timing"]["fetch_sec"] = fetch_sec
return result
+
class PdfTrioBlobWorker(SandcrawlerWorker):
"""
This is sort of like PdfTrioWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
- def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs):
- super().__init__()
+ def __init__(
+ self,
+ pdftrio_client: PdfTrioClient,
+ sink: Optional[SandcrawlerWorker] = None,
+ mode: str = "auto",
+ **kwargs
+ ):
+ super().__init__(**kwargs)
self.pdftrio_client = pdftrio_client
self.sink = sink
self.mode = mode
- def process(self, blob):
+ def process(self, blob: Any, key: Optional[str] = None) -> Any:
start_process = time.time()
if not blob:
return None
+ assert isinstance(blob, bytes)
result = dict()
- result['file_meta'] = gen_file_metadata(blob)
- result['key'] = result['file_meta']['sha1hex']
- result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob, mode=self.mode)
- result['timing'] = dict(
- pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
+ result["file_meta"] = gen_file_metadata(blob)
+ result["key"] = result["file_meta"]["sha1hex"]
+ result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob, mode=self.mode)
+ result["timing"] = dict(
+ pdftrio_sec=result["pdf_trio"].pop("_total_sec", None),
total_sec=time.time() - start_process,
)
return result
-