aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-06-17 11:10:36 -0700
committerBryan Newbold <bnewbold@archive.org>2020-06-17 11:10:36 -0700
commit3287f08a788107815f366019060a7cbcfe9505d2 (patch)
tree29a867d2cf84d116b26be37508d4ea6462dede88
parent5a6bf449ac78586bf150216fe2310be178eeb6c3 (diff)
downloadsandcrawler-3287f08a788107815f366019060a7cbcfe9505d2.tar.gz
sandcrawler-3287f08a788107815f366019060a7cbcfe9505d2.zip
workers: refactor to pass key to process()
-rw-r--r--python/sandcrawler/grobid.py4
-rw-r--r--python/sandcrawler/ingest.py4
-rw-r--r--python/sandcrawler/pdf.py2
-rw-r--r--python/sandcrawler/pdftrio.py4
-rw-r--r--python/sandcrawler/persist.py12
-rw-r--r--python/sandcrawler/workers.py22
6 files changed, 28 insertions, 20 deletions
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index d9db6c3..11623c5 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -95,7 +95,7 @@ class GrobidWorker(SandcrawlerFetchWorker):
key=default_key,
)
- def process(self, record):
+ def process(self, record, key=None):
default_key = record['sha1hex']
fetch_result = self.fetch_blob(record)
@@ -121,7 +121,7 @@ class GrobidBlobWorker(SandcrawlerWorker):
self.sink = sink
self.consolidate_mode = 2
- def process(self, blob):
+ def process(self, blob, key=None):
if not blob:
return None
result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 82b43fe..f4e78e4 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -23,7 +23,7 @@ class IngestFileWorker(SandcrawlerWorker):
but is an HTML 200, treats it as a landing page, tries to extract
fulltext link, then fetches that resource.
- process(request) -> response
+ process(request, key=None) -> response
Does all the things!
Check existing processing (short circuit):
@@ -243,7 +243,7 @@ class IngestFileWorker(SandcrawlerWorker):
return False
return True
- def process(self, request):
+ def process(self, request, key=None):
# backwards compatibility
if request.get('ingest_type') in ('file', None):
diff --git a/python/sandcrawler/pdf.py b/python/sandcrawler/pdf.py
index fb72dfe..b55e2bb 100644
--- a/python/sandcrawler/pdf.py
+++ b/python/sandcrawler/pdf.py
@@ -125,7 +125,7 @@ class PdfExtractWorker(SandcrawlerFetchWorker):
sha1hex=default_key,
)
- def process(self, record) -> dict:
+ def process(self, record, key: Optional[str] = None):
default_key = record['sha1hex']
fetch_result = self.fetch_blob(record)
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index 14d8d04..c65b6c8 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -78,7 +78,7 @@ class PdfTrioWorker(SandcrawlerFetchWorker):
self.pdftrio_client = pdftrio_client
self.sink = sink
- def process(self, record):
+ def process(self, record, key=None):
start_process = time.time()
default_key = record['sha1hex']
fetch_sec = None
@@ -115,7 +115,7 @@ class PdfTrioBlobWorker(SandcrawlerWorker):
self.sink = sink
self.mode = mode
- def process(self, blob):
+ def process(self, blob, key=None):
start_process = time.time()
if not blob:
return None
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index f2a4893..338cdfc 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -35,7 +35,7 @@ class PersistCdxWorker(SandcrawlerWorker):
self.db = SandcrawlerPostgresClient(db_url)
self.cur = self.db.conn.cursor()
- def process(self, record):
+ def process(self, record, key=None):
"""
Only do batches (as transactions)
"""
@@ -60,7 +60,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.db = SandcrawlerPostgresClient(db_url)
self.cur = self.db.conn.cursor()
- def process(self, record):
+ def process(self, record, key=None):
"""
Only do batches (as transactions)
"""
@@ -203,7 +203,7 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
def __init__(self, db_url, **kwargs):
super().__init__(db_url=db_url)
- def process(self, record):
+ def process(self, record, key=None):
"""
Only do batches (as transactions)
"""
@@ -243,7 +243,7 @@ class PersistGrobidWorker(SandcrawlerWorker):
self.db_only = kwargs.get('db_only', False)
assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
- def process(self, record):
+ def process(self, record, key=None):
"""
Only do batches (as transactions)
"""
@@ -327,7 +327,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
)
return obj_path
- def process(self, record):
+ def process(self, record, key=None):
if record.get('status_code') != 200 or not record.get('tei_xml'):
return False
@@ -347,7 +347,7 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
self.db = SandcrawlerPostgresClient(db_url)
self.cur = self.db.conn.cursor()
- def process(self, record):
+ def process(self, record, key=None):
"""
Only do batches (as transactions)
"""
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 208b0e2..8115ee3 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -26,12 +26,12 @@ class SandcrawlerWorker(object):
self.sink = None
# TODO: self.counters
- def push_record(self, task):
+ def push_record(self, task, key=None):
self.counts['total'] += 1
if not self.want(task):
self.counts['skip'] += 1
return
- result = self.process(task)
+ result = self.process(task, key=key)
if not result:
self.counts['failed'] += 1
return
@@ -53,7 +53,7 @@ class SandcrawlerWorker(object):
"""
return None
- def push_record_timeout(self, task, timeout=300):
+ def push_record_timeout(self, task, key=None, timeout=300):
"""
A wrapper around self.push_record which sets a timeout.
@@ -68,7 +68,7 @@ class SandcrawlerWorker(object):
resp = None
signal.alarm(int(timeout))
try:
- resp = self.push_record(task)
+ resp = self.push_record(task, key=key)
except TimeoutError:
self.counts['timeout'] += 1
resp = self.timeout_response(task) # pylint: disable=assignment-from-none
@@ -100,7 +100,7 @@ class SandcrawlerWorker(object):
"""
return True
- def process(self, task):
+ def process(self, task, key=None):
"""
Derived workers need to implement business logic here.
"""
@@ -483,6 +483,7 @@ class KafkaJsonPusher(RecordPusher):
group,
)
self.push_batches = kwargs.get('push_batches', False)
+ self.raw_records = kwargs.get('raw_records', False)
self.poll_interval = kwargs.get('poll_interval', 5.0)
self.batch_size = kwargs.get('batch_size', 100)
if self.batch_size in (0, 1):
@@ -522,7 +523,14 @@ class KafkaJsonPusher(RecordPusher):
else:
for msg in batch:
self.counts['total'] += 1
- record = json.loads(msg.value().decode('utf-8'))
+ if self.raw_records:
+ # In this mode, pass the Kafka message as bytes through
+ # without decoding as JSON. Eg, for thumbnails (where
+ # message bytes are JPEG, and we need # the sha1hex key
+ # from the message)
+ record = msg
+ else:
+ record = json.loads(msg.value().decode('utf-8'))
# This complex bit of code implements backoff/backpressure
# in a way that will not cause this Kafka consumer to lose
# partition assignments (resulting in a rebalance). This
@@ -532,7 +540,7 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record, timeout=300)
+ self.worker.push_record_timeout(record, key=msg.key(), timeout=300)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))