aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-06-17 11:10:36 -0700
committerBryan Newbold <bnewbold@archive.org>2020-06-17 11:10:36 -0700
commit3287f08a788107815f366019060a7cbcfe9505d2 (patch)
tree29a867d2cf84d116b26be37508d4ea6462dede88 /python/sandcrawler/workers.py
parent5a6bf449ac78586bf150216fe2310be178eeb6c3 (diff)
downloadsandcrawler-3287f08a788107815f366019060a7cbcfe9505d2.tar.gz
sandcrawler-3287f08a788107815f366019060a7cbcfe9505d2.zip
workers: refactor to pass key to process()
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py22
1 files changed, 15 insertions, 7 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 208b0e2..8115ee3 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -26,12 +26,12 @@ class SandcrawlerWorker(object):
self.sink = None
# TODO: self.counters
- def push_record(self, task):
+ def push_record(self, task, key=None):
self.counts['total'] += 1
if not self.want(task):
self.counts['skip'] += 1
return
- result = self.process(task)
+ result = self.process(task, key=key)
if not result:
self.counts['failed'] += 1
return
@@ -53,7 +53,7 @@ class SandcrawlerWorker(object):
"""
return None
- def push_record_timeout(self, task, timeout=300):
+ def push_record_timeout(self, task, key=None, timeout=300):
"""
A wrapper around self.push_record which sets a timeout.
@@ -68,7 +68,7 @@ class SandcrawlerWorker(object):
resp = None
signal.alarm(int(timeout))
try:
- resp = self.push_record(task)
+ resp = self.push_record(task, key=key)
except TimeoutError:
self.counts['timeout'] += 1
resp = self.timeout_response(task) # pylint: disable=assignment-from-none
@@ -100,7 +100,7 @@ class SandcrawlerWorker(object):
"""
return True
- def process(self, task):
+ def process(self, task, key=None):
"""
Derived workers need to implement business logic here.
"""
@@ -483,6 +483,7 @@ class KafkaJsonPusher(RecordPusher):
group,
)
self.push_batches = kwargs.get('push_batches', False)
+ self.raw_records = kwargs.get('raw_records', False)
self.poll_interval = kwargs.get('poll_interval', 5.0)
self.batch_size = kwargs.get('batch_size', 100)
if self.batch_size in (0, 1):
@@ -522,7 +523,14 @@ class KafkaJsonPusher(RecordPusher):
else:
for msg in batch:
self.counts['total'] += 1
- record = json.loads(msg.value().decode('utf-8'))
+ if self.raw_records:
+ # In this mode, pass the Kafka message as bytes through
+ # without decoding as JSON. Eg, for thumbnails (where
+ # message bytes are JPEG, and we need # the sha1hex key
+ # from the message)
+ record = msg
+ else:
+ record = json.loads(msg.value().decode('utf-8'))
# This complex bit of code implements backoff/backpressure
# in a way that will not cause this Kafka consumer to lose
# partition assignments (resulting in a rebalance). This
@@ -532,7 +540,7 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record, timeout=300)
+ self.worker.push_record_timeout(record, key=msg.key(), timeout=300)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))