diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 60 |
1 files changed, 25 insertions, 35 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index d8a4016..7135f4c 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,4 +1,3 @@ - import json import multiprocessing.pool import signal @@ -21,7 +20,6 @@ class SandcrawlerWorker(object): Usually these get "pushed" into by a RecordPusher. Output goes to another worker (pipeline-style), or defaults to stdout. """ - def __init__(self): self.counts = Counter() self.sink = None @@ -62,9 +60,9 @@ class SandcrawlerWorker(object): multithreading or if signal-based timeouts are used elsewhere in the same process. """ - def timeout_handler(signum, frame): raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) resp = None signal.alarm(int(timeout)) @@ -72,7 +70,7 @@ class SandcrawlerWorker(object): resp = self.push_record(task, key=key) except TimeoutError: self.counts['timeout'] += 1 - resp = self.timeout_response(task) # pylint: disable=assignment-from-none + resp = self.timeout_response(task) # pylint: disable=assignment-from-none # TODO: what if it is this push_record() itself that is timing out? if resp and self.sink: self.sink.push_record(resp) @@ -113,7 +111,6 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, PDFs) from wayback, archive.org, or other sources. """ - def __init__(self, wayback_client, **kwargs): super().__init__(**kwargs) self.wayback_client = wayback_client @@ -178,7 +175,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): ) blob = resp.content else: - raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + raise ValueError( + "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") if not blob: return dict( key=default_key, @@ -192,8 +190,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): blob=blob, ) -class MultiprocessWrapper(SandcrawlerWorker): +class MultiprocessWrapper(SandcrawlerWorker): def __init__(self, worker, sink, jobs=None): self.counts = Counter() self.worker = worker @@ -226,21 +224,21 @@ class MultiprocessWrapper(SandcrawlerWorker): print("Multiprocessing: {}".format(self.counts), file=sys.stderr) return worker_counts + class BlackholeSink(SandcrawlerWorker): """ Dummy SandcrawlerWorker. That doesn't do or process anything. Useful for tests. """ - def push_record(self, task, key=None): return def push_batch(self, tasks): return -class KafkaSink(SandcrawlerWorker): +class KafkaSink(SandcrawlerWorker): def __init__(self, kafka_hosts, produce_topic, **kwargs): self.sink = None self.counts = Counter() @@ -249,13 +247,12 @@ class KafkaSink(SandcrawlerWorker): config = self.producer_config({ 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes + 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes 'api.version.request': True, 'api.version.fallback.ms': 0, }) self.producer = Producer(config) - @staticmethod def _fail_fast(err, msg): if err is not None: @@ -270,7 +267,7 @@ class KafkaSink(SandcrawlerWorker): 'delivery.report.only.error': True, 'default.topic.config': { 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm } }) return config @@ -285,11 +282,7 @@ class KafkaSink(SandcrawlerWorker): msg = msg.encode('utf-8') assert type(msg) == bytes - self.producer.produce( - self.produce_topic, - msg, - key=key, - on_delivery=self._fail_fast) + self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast) self.counts['produced'] += 1 # check for errors etc @@ -308,7 +301,6 @@ class KafkaCompressSink(KafkaSink): """ Variant of KafkaSink for large documents. Used for, eg, GROBID output. """ - def producer_config(self, kafka_config): config = kafka_config.copy() config.update({ @@ -319,7 +311,7 @@ class KafkaCompressSink(KafkaSink): 'delivery.report.only.error': True, 'default.topic.config': { 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm } }) return config @@ -330,7 +322,6 @@ class RecordPusher: Base class for different record sources to be pushed into workers. Pretty trivial interface, just wraps an importer and pushes records in to it. """ - def __init__(self, worker, **kwargs): self.counts = Counter() self.worker = worker @@ -348,7 +339,6 @@ class RecordPusher: class JsonLinePusher(RecordPusher): - def __init__(self, worker, json_file, **kwargs): self.counts = Counter() self.worker = worker @@ -387,7 +377,6 @@ class JsonLinePusher(RecordPusher): class CdxLinePusher(RecordPusher): - def __init__(self, worker, cdx_file, **kwargs): self.counts = Counter() self.worker = worker @@ -409,7 +398,8 @@ class CdxLinePusher(RecordPusher): if not record: self.counts['skip-parse'] += 1 continue - if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses: + if self.filter_http_statuses and record[ + 'http_status'] not in self.filter_http_statuses: self.counts['skip-http_status'] += 1 continue if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes: @@ -434,7 +424,6 @@ class CdxLinePusher(RecordPusher): class ZipfilePusher(RecordPusher): - def __init__(self, worker, zipfile_path, **kwargs): self.counts = Counter() self.worker = worker @@ -472,8 +461,8 @@ class ZipfilePusher(RecordPusher): print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts -class KafkaJsonPusher(RecordPusher): +class KafkaJsonPusher(RecordPusher): def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): self.counts = Counter() self.worker = worker @@ -499,12 +488,11 @@ class KafkaJsonPusher(RecordPusher): # case where there there is one update and thousands of creates; # update would be lingering in worker, and if worker crashed # never created. Not great. - batch = self.consumer.consume( - num_messages=self.batch_size, - timeout=self.poll_interval) + batch = self.consumer.consume(num_messages=self.batch_size, + timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( - len(batch), self.poll_interval), - file=sys.stderr) + len(batch), self.poll_interval), + file=sys.stderr) if not batch: # TODO: could have some larger timeout here and # self.worker.finish() if it's been more than, eg, a couple @@ -541,7 +529,9 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec) + self.worker.push_record_timeout(record, + key=msg.key(), + timeout=self.process_timeout_sec) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) @@ -611,14 +601,14 @@ def make_kafka_consumer(hosts, consume_topic, group): for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions), - file=sys.stderr) + print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions), + file=sys.stderr) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 # encoded) - consumer.subscribe([topic_name], + consumer.subscribe( + [topic_name], on_assign=on_rebalance, on_revoke=on_rebalance, ) |