From 3287f08a788107815f366019060a7cbcfe9505d2 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Jun 2020 11:10:36 -0700 Subject: workers: refactor to pass key to process() --- python/sandcrawler/workers.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) (limited to 'python/sandcrawler/workers.py') diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 208b0e2..8115ee3 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -26,12 +26,12 @@ class SandcrawlerWorker(object): self.sink = None # TODO: self.counters - def push_record(self, task): + def push_record(self, task, key=None): self.counts['total'] += 1 if not self.want(task): self.counts['skip'] += 1 return - result = self.process(task) + result = self.process(task, key=key) if not result: self.counts['failed'] += 1 return @@ -53,7 +53,7 @@ class SandcrawlerWorker(object): """ return None - def push_record_timeout(self, task, timeout=300): + def push_record_timeout(self, task, key=None, timeout=300): """ A wrapper around self.push_record which sets a timeout. @@ -68,7 +68,7 @@ class SandcrawlerWorker(object): resp = None signal.alarm(int(timeout)) try: - resp = self.push_record(task) + resp = self.push_record(task, key=key) except TimeoutError: self.counts['timeout'] += 1 resp = self.timeout_response(task) # pylint: disable=assignment-from-none @@ -100,7 +100,7 @@ class SandcrawlerWorker(object): """ return True - def process(self, task): + def process(self, task, key=None): """ Derived workers need to implement business logic here. """ @@ -483,6 +483,7 @@ class KafkaJsonPusher(RecordPusher): group, ) self.push_batches = kwargs.get('push_batches', False) + self.raw_records = kwargs.get('raw_records', False) self.poll_interval = kwargs.get('poll_interval', 5.0) self.batch_size = kwargs.get('batch_size', 100) if self.batch_size in (0, 1): @@ -522,7 +523,14 @@ class KafkaJsonPusher(RecordPusher): else: for msg in batch: self.counts['total'] += 1 - record = json.loads(msg.value().decode('utf-8')) + if self.raw_records: + # In this mode, pass the Kafka message as bytes through + # without decoding as JSON. Eg, for thumbnails (where + # message bytes are JPEG, and we need # the sha1hex key + # from the message) + record = msg + else: + record = json.loads(msg.value().decode('utf-8')) # This complex bit of code implements backoff/backpressure # in a way that will not cause this Kafka consumer to lose # partition assignments (resulting in a rebalance). This @@ -532,7 +540,7 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, timeout=300) + self.worker.push_record_timeout(record, key=msg.key(), timeout=300) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) -- cgit v1.2.3