diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
| -rw-r--r-- | python/sandcrawler/workers.py | 60 | 
1 files changed, 25 insertions, 35 deletions
| diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index d8a4016..7135f4c 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,4 +1,3 @@ -  import json  import multiprocessing.pool  import signal @@ -21,7 +20,6 @@ class SandcrawlerWorker(object):      Usually these get "pushed" into by a RecordPusher. Output goes to another      worker (pipeline-style), or defaults to stdout.      """ -      def __init__(self):          self.counts = Counter()          self.sink = None @@ -62,9 +60,9 @@ class SandcrawlerWorker(object):          multithreading or if signal-based timeouts are used elsewhere in the          same process.          """ -          def timeout_handler(signum, frame):              raise TimeoutError("timeout processing record") +          signal.signal(signal.SIGALRM, timeout_handler)          resp = None          signal.alarm(int(timeout)) @@ -72,7 +70,7 @@ class SandcrawlerWorker(object):              resp = self.push_record(task, key=key)          except TimeoutError:              self.counts['timeout'] += 1 -            resp = self.timeout_response(task) # pylint: disable=assignment-from-none +            resp = self.timeout_response(task)  # pylint: disable=assignment-from-none              # TODO: what if it is this push_record() itself that is timing out?              if resp and self.sink:                  self.sink.push_record(resp) @@ -113,7 +111,6 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):      Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,      PDFs) from wayback, archive.org, or other sources.      """ -      def __init__(self, wayback_client, **kwargs):          super().__init__(**kwargs)          self.wayback_client = wayback_client @@ -178,7 +175,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):                  )              blob = resp.content          else: -            raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") +            raise ValueError( +                "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")          if not blob:              return dict(                  key=default_key, @@ -192,8 +190,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):              blob=blob,          ) -class MultiprocessWrapper(SandcrawlerWorker): +class MultiprocessWrapper(SandcrawlerWorker):      def __init__(self, worker, sink, jobs=None):          self.counts = Counter()          self.worker = worker @@ -226,21 +224,21 @@ class MultiprocessWrapper(SandcrawlerWorker):          print("Multiprocessing: {}".format(self.counts), file=sys.stderr)          return worker_counts +  class BlackholeSink(SandcrawlerWorker):      """      Dummy SandcrawlerWorker. That doesn't do or process anything.      Useful for tests.      """ -      def push_record(self, task, key=None):          return      def push_batch(self, tasks):          return -class KafkaSink(SandcrawlerWorker): +class KafkaSink(SandcrawlerWorker):      def __init__(self, kafka_hosts, produce_topic, **kwargs):          self.sink = None          self.counts = Counter() @@ -249,13 +247,12 @@ class KafkaSink(SandcrawlerWorker):          config = self.producer_config({              'bootstrap.servers': kafka_hosts, -            'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes +            'message.max.bytes': 30000000,  # ~30 MBytes; broker is ~50 MBytes              'api.version.request': True,              'api.version.fallback.ms': 0,          })          self.producer = Producer(config) -      @staticmethod      def _fail_fast(err, msg):          if err is not None: @@ -270,7 +267,7 @@ class KafkaSink(SandcrawlerWorker):              'delivery.report.only.error': True,              'default.topic.config': {                  'message.timeout.ms': 30000, -                'request.required.acks': -1, # all brokers must confirm +                'request.required.acks': -1,  # all brokers must confirm              }          })          return config @@ -285,11 +282,7 @@ class KafkaSink(SandcrawlerWorker):              msg = msg.encode('utf-8')          assert type(msg) == bytes -        self.producer.produce( -            self.produce_topic, -            msg, -            key=key, -            on_delivery=self._fail_fast) +        self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast)          self.counts['produced'] += 1          # check for errors etc @@ -308,7 +301,6 @@ class KafkaCompressSink(KafkaSink):      """      Variant of KafkaSink for large documents. Used for, eg, GROBID output.      """ -      def producer_config(self, kafka_config):          config = kafka_config.copy()          config.update({ @@ -319,7 +311,7 @@ class KafkaCompressSink(KafkaSink):              'delivery.report.only.error': True,              'default.topic.config': {                  'message.timeout.ms': 30000, -                'request.required.acks': -1, # all brokers must confirm +                'request.required.acks': -1,  # all brokers must confirm              }          })          return config @@ -330,7 +322,6 @@ class RecordPusher:      Base class for different record sources to be pushed into workers. Pretty      trivial interface, just wraps an importer and pushes records in to it.      """ -      def __init__(self, worker, **kwargs):          self.counts = Counter()          self.worker = worker @@ -348,7 +339,6 @@ class RecordPusher:  class JsonLinePusher(RecordPusher): -      def __init__(self, worker, json_file, **kwargs):          self.counts = Counter()          self.worker = worker @@ -387,7 +377,6 @@ class JsonLinePusher(RecordPusher):  class CdxLinePusher(RecordPusher): -      def __init__(self, worker, cdx_file, **kwargs):          self.counts = Counter()          self.worker = worker @@ -409,7 +398,8 @@ class CdxLinePusher(RecordPusher):              if not record:                  self.counts['skip-parse'] += 1                  continue -            if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses: +            if self.filter_http_statuses and record[ +                    'http_status'] not in self.filter_http_statuses:                  self.counts['skip-http_status'] += 1                  continue              if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes: @@ -434,7 +424,6 @@ class CdxLinePusher(RecordPusher):  class ZipfilePusher(RecordPusher): -      def __init__(self, worker, zipfile_path, **kwargs):          self.counts = Counter()          self.worker = worker @@ -472,8 +461,8 @@ class ZipfilePusher(RecordPusher):          print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)          return self.counts -class KafkaJsonPusher(RecordPusher): +class KafkaJsonPusher(RecordPusher):      def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):          self.counts = Counter()          self.worker = worker @@ -499,12 +488,11 @@ class KafkaJsonPusher(RecordPusher):              # case where there there is one update and thousands of creates;              # update would be lingering in worker, and if worker crashed              # never created. Not great. -            batch = self.consumer.consume( -                num_messages=self.batch_size, -                timeout=self.poll_interval) +            batch = self.consumer.consume(num_messages=self.batch_size, +                                          timeout=self.poll_interval)              print("... got {} kafka messages ({}sec poll interval)".format( -                    len(batch), self.poll_interval), -                file=sys.stderr) +                len(batch), self.poll_interval), +                  file=sys.stderr)              if not batch:                  # TODO: could have some larger timeout here and                  # self.worker.finish() if it's been more than, eg, a couple @@ -541,7 +529,9 @@ class KafkaJsonPusher(RecordPusher):                      while not done:                          try:                              # use timeouts; don't want kafka itself to timeout -                            self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec) +                            self.worker.push_record_timeout(record, +                                                            key=msg.key(), +                                                            timeout=self.process_timeout_sec)                              break                          except SandcrawlerBackoffError as be:                              print("Backing off for 200 seconds: {}".format(be)) @@ -611,14 +601,14 @@ def make_kafka_consumer(hosts, consume_topic, group):          for p in partitions:              if p.error:                  raise KafkaException(p.error) -        print("Kafka partitions rebalanced: {} / {}".format( -                consumer, partitions), -            file=sys.stderr) +        print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions), +              file=sys.stderr)      consumer = Consumer(conf)      # NOTE: it's actually important that topic_name *not* be bytes (UTF-8      # encoded) -    consumer.subscribe([topic_name], +    consumer.subscribe( +        [topic_name],          on_assign=on_rebalance,          on_revoke=on_rebalance,      ) | 
