aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-02-12 19:40:55 -0800
committerBryan Newbold <bnewbold@archive.org>2020-02-12 19:42:43 -0800
commit94912e739c51d2fa4d5f9de878d0b0f0544a4459 (patch)
treeaf7803bee388beba7dd6dce2113e3632284537ac /python/sandcrawler
parent6b3ce3169847a16fe6c0ab00f3a8af8b8ad099ab (diff)
downloadsandcrawler-94912e739c51d2fa4d5f9de878d0b0f0544a4459.tar.gz
sandcrawler-94912e739c51d2fa4d5f9de878d0b0f0544a4459.zip
pdftrio basic python code
This is basically just a copy/paste of GROBID code, only simpler!
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/__init__.py3
-rw-r--r--python/sandcrawler/db.py57
-rw-r--r--python/sandcrawler/pdftrio.py158
-rw-r--r--python/sandcrawler/persist.py21
4 files changed, 238 insertions, 1 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index 2d28829..b52d039 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -1,9 +1,10 @@
from .grobid import GrobidClient, GrobidWorker, GrobidBlobWorker
+from .pdftrio import PdfTrioClient, PdfTrioWorker, PdfTrioBlobWorker
from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
from .ingest import IngestFileWorker
-from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker
+from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker
from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 53d159f..ddb71a0 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -25,6 +25,15 @@ class SandcrawlerPostgrestClient:
else:
return None
+ def get_pdftrio(self, sha1):
+ resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
def get_file_meta(self, sha1):
resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
resp.raise_for_status()
@@ -176,6 +185,54 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
+ def insert_pdftrio(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ pdftrio (sha1hex, updated, status_code, status, pdftrio_version,
+ models_date, ensemble_score, bert_score, linear_score,
+ image_score)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=EXCLUDED.updated,
+ status_code=EXCLUDED.status_code,
+ status=EXCLUDED.status,
+ pdftrio_version=EXCLUDED.pdftrio_version,
+ models_date=EXCLUDED.models_date,
+ ensemble_score=EXCLUDED.ensemble_score,
+ bert_score=EXCLUDED.bert_score,
+ linear_score=EXCLUDED.linear_score,
+ image_score=EXCLUDED.image_score
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [
+ (
+ d['key'],
+ d.get('updated') or datetime.datetime.now(),
+ d['status_code'],
+ d['status'],
+ d.get('versions', {}).get('pdftrio_version') or None,
+ d.get('versions', {}).get('models_date') or None,
+ d.get('ensemble_score') or None,
+ d.get('bert_score') or None,
+ d.get('linear_score') or None,
+ d.get('image_score') or None,
+ )
+ for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
new file mode 100644
index 0000000..a2eedd8
--- /dev/null
+++ b/python/sandcrawler/pdftrio.py
@@ -0,0 +1,158 @@
+
+import requests
+
+from .workers import SandcrawlerWorker
+from .misc import gen_file_metadata, requests_retry_session
+from .ia import WaybackClient, WaybackError, PetaboxError
+
+
+class PdfTrioClient(object):
+
+ def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs):
+ self.host_url = host_url
+ self.http_session = requests_retry_session(retries=3, backoff_factor=3)
+
+ def classify_pdf(self, blob):
+ """
+ Returns a dict with at least:
+
+ - status_code (int, always set)
+ - status (success, or error-*)
+
+ On success, the other remote API JSON response keys are also included.
+
+ On HTTP-level failures, the status_code and status field are set
+ appropriately; an optional `error_msg` may also be set. For some other
+ errors, like connection failure, an exception is raised.
+ """
+ assert blob
+
+ try:
+ pdftrio_response = requests.post(
+ self.host_url + "/classify/pdf/all",
+ files={
+ 'pdf': blob,
+ },
+ timeout=30.0,
+ )
+ except requests.Timeout:
+ return {
+ 'status': 'error-timeout',
+ 'status_code': -4, # heritrix3 "HTTP timeout" code
+ 'error_msg': 'pdftrio request (HTTP POST) timeout',
+ }
+
+ info = dict(
+ status_code=pdftrio_response.status_code,
+ )
+ if pdftrio_response.status_code == 200:
+ resp_json = pdftrio_response.json()
+ assert 'ensemble_score' in resp_json
+ assert 'status' in resp_json
+ assert 'versions' in resp_json
+ info.update(resp_json)
+ else:
+ info['status'] = 'error'
+ # TODO: might return JSON with some info?
+
+ # add this timing info at end so it isn't clobbered by an update()
+ if not info.get('timing'):
+ info['timing'] = dict()
+ info['timing']['total_sec'] = pdftrio_response.elapsed.total_seconds(),
+ return info
+
+
+class PdfTrioWorker(SandcrawlerWorker):
+ """
+ This class is basically copied directly from GrobidWorker
+ """
+
+ def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs):
+ super().__init__()
+ self.pdftrio_client = pdftrio_client
+ self.wayback_client = wayback_client
+ self.sink = sink
+
+ def process(self, record):
+ default_key = record['sha1hex']
+ if record.get('warc_path') and record.get('warc_offset'):
+ # it's a full CDX dict. fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ blob = self.wayback_client.fetch_petabox_body(
+ csize=record['warc_csize'],
+ offset=record['warc_offset'],
+ warc_path=record['warc_path'],
+ )
+ except (WaybackError, PetaboxError) as we:
+ return dict(
+ status="error-wayback",
+ error_msg=str(we),
+ source=record,
+ key=default_key,
+ )
+ elif record.get('url') and record.get('datetime'):
+ # it's a partial CDX dict or something? fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ blob = self.wayback_client.fetch_replay_body(
+ url=record['url'],
+ datetime=record['datetime'],
+ )
+ except WaybackError as we:
+ return dict(
+ status="error-wayback",
+ error_msg=str(we),
+ source=record,
+ key=default_key,
+ )
+ elif record.get('item') and record.get('path'):
+ # it's petabox link; fetch via HTTP
+ resp = requests.get("https://archive.org/serve/{}/{}".format(
+ record['item'], record['path']))
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ return dict(
+ status="error-petabox",
+ error_msg=str(e),
+ source=record,
+ key=default_key,
+ )
+ blob = resp.content
+ else:
+ raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ if not blob:
+ return dict(
+ status="error",
+ error_msg="empty blob",
+ source=record,
+ key=default_key,
+ )
+ result = self.pdftrio_client.classify_pdf(blob)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['source'] = record
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
+class PdfTrioBlobWorker(SandcrawlerWorker):
+ """
+ This is sort of like PdfTrioWorker, except it receives blobs directly,
+ instead of fetching blobs from some remote store.
+ """
+
+ def __init__(self, pdftrio_client, sink=None, **kwargs):
+ super().__init__()
+ self.pdftrio_client = pdftrio_client
+ self.sink = sink
+
+ def process(self, blob):
+ if not blob:
+ return None
+ result = self.pdftrio_client.classify_pdf(blob)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 6469940..64b2022 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -309,3 +309,24 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
self.counts['written'] += 1
return record
+
+class PersistPdfTrioWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+ resp = self.db.insert_pdftrio(self.cur, batch)
+ self.counts['insert-pdftrio'] += resp[0]
+ self.counts['update-pdftrio'] += resp[1]
+ self.db.commit()
+ return []