From 3370f203c3652ace357eeb69bb8828d830b3596a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 13 Feb 2020 15:28:48 -0800 Subject: move pdf_trio results back under key in JSON/Kafka --- python/sandcrawler/pdftrio.py | 28 ++++++++++++++++++++++------ python/sandcrawler/persist.py | 10 +++++++++- 2 files changed, 31 insertions(+), 7 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py index e995792..5e4630b 100644 --- a/python/sandcrawler/pdftrio.py +++ b/python/sandcrawler/pdftrio.py @@ -1,4 +1,5 @@ +import time import requests from .workers import SandcrawlerWorker @@ -55,10 +56,7 @@ class PdfTrioClient(object): info['status'] = 'error' # TODO: might return JSON with some info? - # add this timing info at end so it isn't clobbered by an update() - if not info.get('timing'): - info['timing'] = dict() - info['timing']['total_sec'] = pdftrio_response.elapsed.total_seconds(), + info['_total_sec'] = pdftrio_response.elapsed.total_seconds() return info @@ -74,17 +72,22 @@ class PdfTrioWorker(SandcrawlerWorker): self.sink = sink def process(self, record): + start_process = time.time() default_key = record['sha1hex'] + wayback_sec = None + petabox_sec = None if record.get('warc_path') and record.get('warc_offset'): # it's a full CDX dict. fetch using WaybackClient if not self.wayback_client: raise Exception("wayback client not configured for this PdfTrioWorker") try: + start = time.time() blob = self.wayback_client.fetch_petabox_body( csize=record['warc_csize'], offset=record['warc_offset'], warc_path=record['warc_path'], ) + wayback_sec = time.time() - start except (WaybackError, PetaboxError) as we: return dict( status="error-wayback", @@ -97,10 +100,12 @@ class PdfTrioWorker(SandcrawlerWorker): if not self.wayback_client: raise Exception("wayback client not configured for this PdfTrioWorker") try: + start = time.time() blob = self.wayback_client.fetch_replay_body( url=record['url'], datetime=record['datetime'], ) + wayback_sec = time.time() - start except WaybackError as we: return dict( status="error-wayback", @@ -110,8 +115,10 @@ class PdfTrioWorker(SandcrawlerWorker): ) elif record.get('item') and record.get('path'): # it's petabox link; fetch via HTTP + start = time.time() resp = requests.get("https://archive.org/serve/{}/{}".format( record['item'], record['path'])) + petabox_sec = time.time() - start try: resp.raise_for_status() except Exception as e: @@ -131,10 +138,19 @@ class PdfTrioWorker(SandcrawlerWorker): source=record, key=default_key, ) - result = self.pdftrio_client.classify_pdf(blob) + result = dict() + result['key'] = result['file_meta']['sha1hex'] result['file_meta'] = gen_file_metadata(blob) + result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob) result['source'] = record - result['key'] = result['file_meta']['sha1hex'] + result['timing'] = dict( + pdftrio_sec=result['pdf_trio'].pop('_total_sec'), + total_sec=time.time() - start_process, + ) + if wayback_sec: + result['timing']['wayback_sec'] = wayback_sec + if petabox_sec: + result['timing']['petabox_sec'] = wayback_sec return result class PdfTrioBlobWorker(SandcrawlerWorker): diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 64b2022..bfd8247 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -325,8 +325,16 @@ class PersistPdfTrioWorker(SandcrawlerWorker): def push_batch(self, batch): self.counts['total'] += len(batch) - resp = self.db.insert_pdftrio(self.cur, batch) + + pdftrio_batch = [r['pdf_trio'] for r in batch] + resp = self.db.insert_pdftrio(self.cur, pdftrio_batch) self.counts['insert-pdftrio'] += resp[0] self.counts['update-pdftrio'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch) + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] + self.db.commit() return [] -- cgit v1.2.3