From 3287f08a788107815f366019060a7cbcfe9505d2 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Jun 2020 11:10:36 -0700 Subject: workers: refactor to pass key to process() --- python/sandcrawler/grobid.py | 4 ++-- python/sandcrawler/ingest.py | 4 ++-- python/sandcrawler/pdf.py | 2 +- python/sandcrawler/pdftrio.py | 4 ++-- python/sandcrawler/persist.py | 12 ++++++------ python/sandcrawler/workers.py | 22 +++++++++++++++------- 6 files changed, 28 insertions(+), 20 deletions(-) diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index d9db6c3..11623c5 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -95,7 +95,7 @@ class GrobidWorker(SandcrawlerFetchWorker): key=default_key, ) - def process(self, record): + def process(self, record, key=None): default_key = record['sha1hex'] fetch_result = self.fetch_blob(record) @@ -121,7 +121,7 @@ class GrobidBlobWorker(SandcrawlerWorker): self.sink = sink self.consolidate_mode = 2 - def process(self, blob): + def process(self, blob, key=None): if not blob: return None result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index 82b43fe..f4e78e4 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -23,7 +23,7 @@ class IngestFileWorker(SandcrawlerWorker): but is an HTML 200, treats it as a landing page, tries to extract fulltext link, then fetches that resource. - process(request) -> response + process(request, key=None) -> response Does all the things! Check existing processing (short circuit): @@ -243,7 +243,7 @@ class IngestFileWorker(SandcrawlerWorker): return False return True - def process(self, request): + def process(self, request, key=None): # backwards compatibility if request.get('ingest_type') in ('file', None): diff --git a/python/sandcrawler/pdf.py b/python/sandcrawler/pdf.py index fb72dfe..b55e2bb 100644 --- a/python/sandcrawler/pdf.py +++ b/python/sandcrawler/pdf.py @@ -125,7 +125,7 @@ class PdfExtractWorker(SandcrawlerFetchWorker): sha1hex=default_key, ) - def process(self, record) -> dict: + def process(self, record, key: Optional[str] = None): default_key = record['sha1hex'] fetch_result = self.fetch_blob(record) diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py index 14d8d04..c65b6c8 100644 --- a/python/sandcrawler/pdftrio.py +++ b/python/sandcrawler/pdftrio.py @@ -78,7 +78,7 @@ class PdfTrioWorker(SandcrawlerFetchWorker): self.pdftrio_client = pdftrio_client self.sink = sink - def process(self, record): + def process(self, record, key=None): start_process = time.time() default_key = record['sha1hex'] fetch_sec = None @@ -115,7 +115,7 @@ class PdfTrioBlobWorker(SandcrawlerWorker): self.sink = sink self.mode = mode - def process(self, blob): + def process(self, blob, key=None): start_process = time.time() if not blob: return None diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index f2a4893..338cdfc 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -35,7 +35,7 @@ class PersistCdxWorker(SandcrawlerWorker): self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() - def process(self, record): + def process(self, record, key=None): """ Only do batches (as transactions) """ @@ -60,7 +60,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() - def process(self, record): + def process(self, record, key=None): """ Only do batches (as transactions) """ @@ -203,7 +203,7 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker): def __init__(self, db_url, **kwargs): super().__init__(db_url=db_url) - def process(self, record): + def process(self, record, key=None): """ Only do batches (as transactions) """ @@ -243,7 +243,7 @@ class PersistGrobidWorker(SandcrawlerWorker): self.db_only = kwargs.get('db_only', False) assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" - def process(self, record): + def process(self, record, key=None): """ Only do batches (as transactions) """ @@ -327,7 +327,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): ) return obj_path - def process(self, record): + def process(self, record, key=None): if record.get('status_code') != 200 or not record.get('tei_xml'): return False @@ -347,7 +347,7 @@ class PersistPdfTrioWorker(SandcrawlerWorker): self.db = SandcrawlerPostgresClient(db_url) self.cur = self.db.conn.cursor() - def process(self, record): + def process(self, record, key=None): """ Only do batches (as transactions) """ diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 208b0e2..8115ee3 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -26,12 +26,12 @@ class SandcrawlerWorker(object): self.sink = None # TODO: self.counters - def push_record(self, task): + def push_record(self, task, key=None): self.counts['total'] += 1 if not self.want(task): self.counts['skip'] += 1 return - result = self.process(task) + result = self.process(task, key=key) if not result: self.counts['failed'] += 1 return @@ -53,7 +53,7 @@ class SandcrawlerWorker(object): """ return None - def push_record_timeout(self, task, timeout=300): + def push_record_timeout(self, task, key=None, timeout=300): """ A wrapper around self.push_record which sets a timeout. @@ -68,7 +68,7 @@ class SandcrawlerWorker(object): resp = None signal.alarm(int(timeout)) try: - resp = self.push_record(task) + resp = self.push_record(task, key=key) except TimeoutError: self.counts['timeout'] += 1 resp = self.timeout_response(task) # pylint: disable=assignment-from-none @@ -100,7 +100,7 @@ class SandcrawlerWorker(object): """ return True - def process(self, task): + def process(self, task, key=None): """ Derived workers need to implement business logic here. """ @@ -483,6 +483,7 @@ class KafkaJsonPusher(RecordPusher): group, ) self.push_batches = kwargs.get('push_batches', False) + self.raw_records = kwargs.get('raw_records', False) self.poll_interval = kwargs.get('poll_interval', 5.0) self.batch_size = kwargs.get('batch_size', 100) if self.batch_size in (0, 1): @@ -522,7 +523,14 @@ class KafkaJsonPusher(RecordPusher): else: for msg in batch: self.counts['total'] += 1 - record = json.loads(msg.value().decode('utf-8')) + if self.raw_records: + # In this mode, pass the Kafka message as bytes through + # without decoding as JSON. Eg, for thumbnails (where + # message bytes are JPEG, and we need # the sha1hex key + # from the message) + record = msg + else: + record = json.loads(msg.value().decode('utf-8')) # This complex bit of code implements backoff/backpressure # in a way that will not cause this Kafka consumer to lose # partition assignments (resulting in a rebalance). This @@ -532,7 +540,7 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, timeout=300) + self.worker.push_record_timeout(record, key=msg.key(), timeout=300) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) -- cgit v1.2.3