aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-02-13 15:28:48 -0800
committerBryan Newbold <bnewbold@archive.org>2020-02-13 15:28:48 -0800
commit3370f203c3652ace357eeb69bb8828d830b3596a (patch)
treee283ade7600932b84605b84c852da01c2cd2dbdb
parent4aec6410c2318972240ded2bce5f68706aae18df (diff)
downloadsandcrawler-3370f203c3652ace357eeb69bb8828d830b3596a.tar.gz
sandcrawler-3370f203c3652ace357eeb69bb8828d830b3596a.zip
move pdf_trio results back under key in JSON/Kafka
-rw-r--r--proposals/20200207_pdftrio.md33
-rw-r--r--python/sandcrawler/pdftrio.py28
-rw-r--r--python/sandcrawler/persist.py10
3 files changed, 49 insertions, 22 deletions
diff --git a/proposals/20200207_pdftrio.md b/proposals/20200207_pdftrio.md
index 7ad5142..31a2db6 100644
--- a/proposals/20200207_pdftrio.md
+++ b/proposals/20200207_pdftrio.md
@@ -58,24 +58,27 @@ Basically just like GROBID client for now. Requests, JSON.
Output that goes in Kafka topic:
key (sha1hex)
- status
- status_code
- ensemble_score
- bert_score
- image_score
- linear_score
- versions
- pdftrio_version (string)
- models_date (string, ISO date)
- git_rev (string)
- bert_model (string)
- image_model (string)
- linear_model (string)
- timing (might be added?)
- ...
+ pdf_trio
+ status
+ status_code
+ ensemble_score
+ bert_score
+ image_score
+ linear_score
+ versions
+ pdftrio_version (string)
+ models_date (string, ISO date)
+ git_rev (string)
+ bert_model (string)
+ image_model (string)
+ linear_model (string)
+ timing (optional/future: as reported by API)
+ ...
file_meta
sha1hex
...
+ timing
+ ...
## SQL Schema
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index e995792..5e4630b 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -1,4 +1,5 @@
+import time
import requests
from .workers import SandcrawlerWorker
@@ -55,10 +56,7 @@ class PdfTrioClient(object):
info['status'] = 'error'
# TODO: might return JSON with some info?
- # add this timing info at end so it isn't clobbered by an update()
- if not info.get('timing'):
- info['timing'] = dict()
- info['timing']['total_sec'] = pdftrio_response.elapsed.total_seconds(),
+ info['_total_sec'] = pdftrio_response.elapsed.total_seconds()
return info
@@ -74,17 +72,22 @@ class PdfTrioWorker(SandcrawlerWorker):
self.sink = sink
def process(self, record):
+ start_process = time.time()
default_key = record['sha1hex']
+ wayback_sec = None
+ petabox_sec = None
if record.get('warc_path') and record.get('warc_offset'):
# it's a full CDX dict. fetch using WaybackClient
if not self.wayback_client:
raise Exception("wayback client not configured for this PdfTrioWorker")
try:
+ start = time.time()
blob = self.wayback_client.fetch_petabox_body(
csize=record['warc_csize'],
offset=record['warc_offset'],
warc_path=record['warc_path'],
)
+ wayback_sec = time.time() - start
except (WaybackError, PetaboxError) as we:
return dict(
status="error-wayback",
@@ -97,10 +100,12 @@ class PdfTrioWorker(SandcrawlerWorker):
if not self.wayback_client:
raise Exception("wayback client not configured for this PdfTrioWorker")
try:
+ start = time.time()
blob = self.wayback_client.fetch_replay_body(
url=record['url'],
datetime=record['datetime'],
)
+ wayback_sec = time.time() - start
except WaybackError as we:
return dict(
status="error-wayback",
@@ -110,8 +115,10 @@ class PdfTrioWorker(SandcrawlerWorker):
)
elif record.get('item') and record.get('path'):
# it's petabox link; fetch via HTTP
+ start = time.time()
resp = requests.get("https://archive.org/serve/{}/{}".format(
record['item'], record['path']))
+ petabox_sec = time.time() - start
try:
resp.raise_for_status()
except Exception as e:
@@ -131,10 +138,19 @@ class PdfTrioWorker(SandcrawlerWorker):
source=record,
key=default_key,
)
- result = self.pdftrio_client.classify_pdf(blob)
+ result = dict()
+ result['key'] = result['file_meta']['sha1hex']
result['file_meta'] = gen_file_metadata(blob)
+ result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob)
result['source'] = record
- result['key'] = result['file_meta']['sha1hex']
+ result['timing'] = dict(
+ pdftrio_sec=result['pdf_trio'].pop('_total_sec'),
+ total_sec=time.time() - start_process,
+ )
+ if wayback_sec:
+ result['timing']['wayback_sec'] = wayback_sec
+ if petabox_sec:
+ result['timing']['petabox_sec'] = wayback_sec
return result
class PdfTrioBlobWorker(SandcrawlerWorker):
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 64b2022..bfd8247 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -325,8 +325,16 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
def push_batch(self, batch):
self.counts['total'] += len(batch)
- resp = self.db.insert_pdftrio(self.cur, batch)
+
+ pdftrio_batch = [r['pdf_trio'] for r in batch]
+ resp = self.db.insert_pdftrio(self.cur, pdftrio_batch)
self.counts['insert-pdftrio'] += resp[0]
self.counts['update-pdftrio'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch)
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
+
self.db.commit()
return []