diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 289 |
1 files changed, 161 insertions, 128 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index ceb6671..bd7f36a 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -10,8 +10,13 @@ from typing import Any, Dict, List, Optional, Sequence import requests from confluent_kafka import Consumer, KafkaException, Producer -from .ia import (PetaboxError, SandcrawlerBackoffError, WaybackClient, WaybackContentError, - WaybackError) +from .ia import ( + PetaboxError, + SandcrawlerBackoffError, + WaybackClient, + WaybackContentError, + WaybackError, +) from .misc import parse_cdx_line @@ -22,25 +27,26 @@ class SandcrawlerWorker(object): Usually these get "pushed" into by a RecordPusher. Output goes to another worker (pipeline-style), or defaults to stdout. """ - def __init__(self, sink: Optional['SandcrawlerWorker'] = None): + + def __init__(self, sink: Optional["SandcrawlerWorker"] = None): self.counts: Counter = Counter() self.sink: Optional[SandcrawlerWorker] = sink def push_record(self, task: Any, key: Optional[str] = None) -> Any: - self.counts['total'] += 1 + self.counts["total"] += 1 if not self.want(task): - self.counts['skip'] += 1 + self.counts["skip"] += 1 return result = self.process(task, key=key) if not result: - self.counts['failed'] += 1 + self.counts["failed"] += 1 return - elif type(result) == dict and 'status' in result and len(result['status']) < 32: - self.counts[result['status']] += 1 + elif type(result) == dict and "status" in result and len(result["status"]) < 32: + self.counts[result["status"]] += 1 if self.sink: self.sink.push_record(result) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 else: print(json.dumps(result)) return result @@ -53,10 +59,9 @@ class SandcrawlerWorker(object): """ return None - def push_record_timeout(self, - task: Any, - key: Optional[str] = None, - timeout: int = 300) -> Any: + def push_record_timeout( + self, task: Any, key: Optional[str] = None, timeout: int = 300 + ) -> Any: """ A wrapper around self.push_record which sets a timeout. @@ -64,6 +69,7 @@ class SandcrawlerWorker(object): multithreading or if signal-based timeouts are used elsewhere in the same process. """ + def timeout_handler(signum: int, frame: Any) -> None: raise TimeoutError("timeout processing record") @@ -73,12 +79,12 @@ class SandcrawlerWorker(object): try: resp = self.push_record(task, key=key) except TimeoutError: - self.counts['timeout'] += 1 + self.counts["timeout"] += 1 resp = self.timeout_response(task) # pylint: disable=assignment-from-none # TODO: what if it is this push_record() itself that is timing out? if resp and self.sink: self.sink.push_record(resp) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 elif resp: print(json.dumps(resp)) finally: @@ -109,7 +115,7 @@ class SandcrawlerWorker(object): TODO: should derived workers explicitly type-check the 'task' object? """ - raise NotImplementedError('implementation required') + raise NotImplementedError("implementation required") class SandcrawlerFetchWorker(SandcrawlerWorker): @@ -117,25 +123,26 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, PDFs) from wayback, archive.org, or other sources. """ + def __init__(self, wayback_client: Optional[WaybackClient], **kwargs): super().__init__(**kwargs) self.wayback_client = wayback_client def fetch_blob(self, record: Dict[str, Any]) -> Dict[str, Any]: - default_key = record['sha1hex'] + default_key = record["sha1hex"] wayback_sec = None petabox_sec = None - if record.get('warc_path') and record.get('warc_offset'): + if record.get("warc_path") and record.get("warc_offset"): # it's a full CDX dict. fetch using WaybackClient if not self.wayback_client: raise Exception("wayback client not configured for this SandcrawlerFetchWorker") try: start = time.time() blob: bytes = self.wayback_client.fetch_petabox_body( - csize=record['warc_csize'], - offset=record['warc_offset'], - warc_path=record['warc_path'], + csize=record["warc_csize"], + offset=record["warc_offset"], + warc_path=record["warc_path"], ) wayback_sec = time.time() - start except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we: @@ -145,15 +152,15 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): status="error-wayback", error_msg=str(we), ) - elif record.get('url') and record.get('datetime'): + elif record.get("url") and record.get("datetime"): # it's a partial CDX dict or something? fetch using WaybackClient if not self.wayback_client: raise Exception("wayback client not configured for this SandcrawlerFetchWorker") try: start = time.time() blob = self.wayback_client.fetch_replay_body( - url=record['url'], - datetime=record['datetime'], + url=record["url"], + datetime=record["datetime"], ) wayback_sec = time.time() - start except (WaybackError, WaybackContentError) as we: @@ -163,11 +170,12 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): status="error-wayback", error_msg=str(we), ) - elif record.get('item') and record.get('path'): + elif record.get("item") and record.get("path"): # it's petabox link; fetch via HTTP start = time.time() - ia_resp = requests.get("https://archive.org/serve/{}/{}".format( - record['item'], record['path'])) + ia_resp = requests.get( + "https://archive.org/serve/{}/{}".format(record["item"], record["path"]) + ) petabox_sec = time.time() - start try: ia_resp.raise_for_status() @@ -181,7 +189,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): blob = ia_resp.content else: raise ValueError( - "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed" + ) if not blob: return dict( key=default_key, @@ -201,29 +210,31 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): class MultiprocessWrapper(SandcrawlerWorker): - def __init__(self, - worker: SandcrawlerWorker, - sink: Optional[SandcrawlerWorker] = None, - jobs: Optional[int] = None): + def __init__( + self, + worker: SandcrawlerWorker, + sink: Optional[SandcrawlerWorker] = None, + jobs: Optional[int] = None, + ): self.counts = Counter() self.worker = worker self.sink = sink self.pool = multiprocessing.pool.Pool(jobs) def push_batch(self, tasks: List[Any]) -> List[Any]: - self.counts['total'] += len(tasks) + self.counts["total"] += len(tasks) print("... processing batch of: {}".format(len(tasks)), file=sys.stderr) results = self.pool.map(self.worker.process, tasks) for result in results: if not result: - self.counts['failed'] += 1 + self.counts["failed"] += 1 return [] - elif type(result) == dict and 'status' in result and len(result['status']) < 32: - self.counts[result['status']] += 1 + elif type(result) == dict and "status" in result and len(result["status"]) < 32: + self.counts[result["status"]] += 1 if self.sink: self.sink.push_record(result) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 else: print(json.dumps(result)) return results @@ -243,6 +254,7 @@ class BlackholeSink(SandcrawlerWorker): Useful for tests. """ + def push_record(self, task: Any, key: Optional[str] = None) -> Any: return @@ -257,12 +269,14 @@ class KafkaSink(SandcrawlerWorker): self.produce_topic = produce_topic self.kafka_hosts = kafka_hosts - config = self.producer_config({ - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes - 'api.version.request': True, - 'api.version.fallback.ms': 0, - }) + config = self.producer_config( + { + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 30000000, # ~30 MBytes; broker is ~50 MBytes + "api.version.request": True, + "api.version.fallback.ms": 0, + } + ) self.producer = Producer(config) @staticmethod @@ -275,27 +289,29 @@ class KafkaSink(SandcrawlerWorker): def producer_config(self, kafka_config: dict) -> dict: config = kafka_config.copy() - config.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + config.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "message.timeout.ms": 30000, + "request.required.acks": -1, # all brokers must confirm + }, } - }) + ) return config def push_record(self, msg: Any, key: Optional[str] = None) -> Any: - self.counts['total'] += 1 + self.counts["total"] += 1 if type(msg) == dict: - if not key and 'key' in msg: - key = msg['key'] + if not key and "key" in msg: + key = msg["key"] msg = json.dumps(msg) if type(msg) == str: - msg = msg.encode('utf-8') + msg = msg.encode("utf-8") assert type(msg) == bytes self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast) - self.counts['produced'] += 1 + self.counts["produced"] += 1 # check for errors etc self.producer.poll(0) @@ -314,19 +330,22 @@ class KafkaCompressSink(KafkaSink): """ Variant of KafkaSink for large documents. Used for, eg, GROBID output. """ + def producer_config(self, kafka_config: Dict[str, Any]) -> Dict[str, Any]: config = kafka_config.copy() - config.update({ - 'compression.codec': 'gzip', - 'retry.backoff.ms': 250, - 'linger.ms': 1000, - 'batch.num.messages': 50, - 'delivery.report.only.error': True, - 'default.topic.config': { - 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + config.update( + { + "compression.codec": "gzip", + "retry.backoff.ms": 250, + "linger.ms": 1000, + "batch.num.messages": 50, + "delivery.report.only.error": True, + "default.topic.config": { + "message.timeout.ms": 30000, + "request.required.acks": -1, # all brokers must confirm + }, } - }) + ) return config @@ -335,6 +354,7 @@ class RecordPusher: Base class for different record sources to be pushed into workers. Pretty trivial interface, just wraps an importer and pushes records in to it. """ + def __init__(self, worker: SandcrawlerWorker, **kwargs): self.counts: Counter = Counter() self.worker: SandcrawlerWorker = worker @@ -356,7 +376,7 @@ class JsonLinePusher(RecordPusher): self.counts = Counter() self.worker = worker self.json_file = json_file - self.batch_size = kwargs.get('batch_size', None) + self.batch_size = kwargs.get("batch_size", None) if self.batch_size in (0, 1): self.batch_size = None @@ -365,24 +385,24 @@ class JsonLinePusher(RecordPusher): for line in self.json_file: if not line: continue - self.counts['total'] += 1 + self.counts["total"] += 1 try: record = json.loads(line) except json.decoder.JSONDecodeError: - self.counts['error-json-decode'] += 1 + self.counts["error-json-decode"] += 1 continue if self.batch_size: batch.append(record) if len(batch) >= self.batch_size: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] else: self.worker.push_record(record) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 if self.batch_size and batch: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] self.worker.finish() print("JSON lines pushed: {}".format(self.counts), file=sys.stderr) @@ -394,10 +414,10 @@ class CdxLinePusher(RecordPusher): self.counts = Counter() self.worker = worker self.cdx_file = cdx_file - self.filter_http_statuses = kwargs.get('filter_http_statuses', None) - self.filter_mimetypes = kwargs.get('filter_mimetypes', None) - self.allow_octet_stream = kwargs.get('allow_octet_stream', False) - self.batch_size = kwargs.get('batch_size', None) + self.filter_http_statuses = kwargs.get("filter_http_statuses", None) + self.filter_mimetypes = kwargs.get("filter_mimetypes", None) + self.allow_octet_stream = kwargs.get("allow_octet_stream", False) + self.batch_size = kwargs.get("batch_size", None) if self.batch_size in (0, 1): self.batch_size = None @@ -406,30 +426,32 @@ class CdxLinePusher(RecordPusher): for line in self.cdx_file: if not line: continue - self.counts['total'] += 1 + self.counts["total"] += 1 record = parse_cdx_line(line, normalize=True) if not record: - self.counts['skip-parse'] += 1 + self.counts["skip-parse"] += 1 continue - if self.filter_http_statuses and record[ - 'http_status'] not in self.filter_http_statuses: - self.counts['skip-http_status'] += 1 + if ( + self.filter_http_statuses + and record["http_status"] not in self.filter_http_statuses + ): + self.counts["skip-http_status"] += 1 continue - if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes: - self.counts['skip-mimetype'] += 1 + if self.filter_mimetypes and record["mimetype"] not in self.filter_mimetypes: + self.counts["skip-mimetype"] += 1 continue if self.batch_size: batch.append(record) if len(batch) >= self.batch_size: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] else: self.worker.push_record(record) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 if self.batch_size and batch: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] self.worker.finish() print("CDX lines pushed: {}".format(self.counts), file=sys.stderr) @@ -442,33 +464,33 @@ class ZipfilePusher(RecordPusher): self.worker = worker self.filter_suffix = ".pdf" self.zipfile_path = zipfile_path - self.batch_size = kwargs.get('batch_size', None) + self.batch_size = kwargs.get("batch_size", None) if self.batch_size in (0, 1): self.batch_size = None def run(self) -> Counter: batch = [] - with zipfile.ZipFile(self.zipfile_path, 'r') as archive: + with zipfile.ZipFile(self.zipfile_path, "r") as archive: for zipinfo in archive.infolist(): if not zipinfo.filename.endswith(self.filter_suffix): continue - self.counts['total'] += 1 + self.counts["total"] += 1 # NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it - flo = archive.open(zipinfo, 'r') - data = flo.read(2**32) + flo = archive.open(zipinfo, "r") + data = flo.read(2 ** 32) flo.close() if self.batch_size: batch.append(data) if len(batch) >= self.batch_size: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] else: self.worker.push_record(data) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 if self.batch_size and batch: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] self.worker.finish() print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) @@ -476,8 +498,14 @@ class ZipfilePusher(RecordPusher): class KafkaJsonPusher(RecordPusher): - def __init__(self, worker: SandcrawlerWorker, kafka_hosts: str, consume_topic: str, - group: str, **kwargs): + def __init__( + self, + worker: SandcrawlerWorker, + kafka_hosts: str, + consume_topic: str, + group: str, + **kwargs + ): self.counts = Counter() self.worker = worker self.consumer = make_kafka_consumer( @@ -485,14 +513,14 @@ class KafkaJsonPusher(RecordPusher): consume_topic, group, ) - self.push_batches = kwargs.get('push_batches', False) - self.raw_records = kwargs.get('raw_records', False) - self.poll_interval = kwargs.get('poll_interval', 5.0) - self.batch_size = kwargs.get('batch_size', 100) + self.push_batches = kwargs.get("push_batches", False) + self.raw_records = kwargs.get("raw_records", False) + self.poll_interval = kwargs.get("poll_interval", 5.0) + self.batch_size = kwargs.get("batch_size", 100) if self.batch_size in (0, 1): self.batch_size = 1 - self.batch_worker = kwargs.get('batch_worker', False) - self.process_timeout_sec = kwargs.get('process_timeout_sec', 300) + self.batch_worker = kwargs.get("batch_worker", False) + self.process_timeout_sec = kwargs.get("process_timeout_sec", 300) def run(self) -> Counter: while True: @@ -502,11 +530,15 @@ class KafkaJsonPusher(RecordPusher): # case where there there is one update and thousands of creates; # update would be lingering in worker, and if worker crashed # never created. Not great. - batch = self.consumer.consume(num_messages=self.batch_size, - timeout=self.poll_interval) - print("... got {} kafka messages ({}sec poll interval)".format( - len(batch), self.poll_interval), - file=sys.stderr) + batch = self.consumer.consume( + num_messages=self.batch_size, timeout=self.poll_interval + ) + print( + "... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval + ), + file=sys.stderr, + ) if not batch: # TODO: could have some larger timeout here and # self.worker.finish() if it's been more than, eg, a couple @@ -518,14 +550,14 @@ class KafkaJsonPusher(RecordPusher): raise KafkaException(msg.error()) # ... then process if self.push_batches: - self.counts['total'] += len(batch) - records = [json.loads(msg.value().decode('utf-8')) for msg in batch] + self.counts["total"] += len(batch) + records = [json.loads(msg.value().decode("utf-8")) for msg in batch] self.worker.push_batch(records) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) print("Import counts: {}".format(self.worker.counts), file=sys.stderr) else: for msg in batch: - self.counts['total'] += 1 + self.counts["total"] += 1 if self.raw_records: # In this mode, pass the Kafka message as bytes through # without decoding as JSON. Eg, for thumbnails (where @@ -533,7 +565,7 @@ class KafkaJsonPusher(RecordPusher): # from the message) record = msg.value() else: - record = json.loads(msg.value().decode('utf-8')) + record = json.loads(msg.value().decode("utf-8")) # This complex bit of code implements backoff/backpressure # in a way that will not cause this Kafka consumer to lose # partition assignments (resulting in a rebalance). This @@ -543,9 +575,9 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, - key=msg.key(), - timeout=self.process_timeout_sec) + self.worker.push_record_timeout( + record, key=msg.key(), timeout=self.process_timeout_sec + ) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) @@ -557,8 +589,8 @@ class KafkaJsonPusher(RecordPusher): assert not empty_batch time.sleep(5) self.consumer.resume(self.consumer.assignment()) - self.counts['pushed'] += 1 - if self.counts['total'] % 500 == 0: + self.counts["pushed"] += 1 + if self.counts["total"] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) for msg in batch: # locally store offsets of processed messages; will be @@ -589,25 +621,25 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer: print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) - #print("Kafka consumer commit successful") + # print("Kafka consumer commit successful") pass # previously, using pykafka - #auto_commit_enable=True, - #auto_commit_interval_ms=30000, # 30 seconds + # auto_commit_enable=True, + # auto_commit_interval_ms=30000, # 30 seconds conf = { - 'bootstrap.servers': hosts, - 'group.id': group, - 'on_commit': fail_fast, + "bootstrap.servers": hosts, + "group.id": group, + "on_commit": fail_fast, # messages don't have offset marked as stored until processed, # but we do auto-commit stored offsets to broker - 'enable.auto.offset.store': False, - 'enable.auto.commit': True, + "enable.auto.offset.store": False, + "enable.auto.commit": True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 6min) - 'max.poll.interval.ms': 360000, - 'default.topic.config': { - 'auto.offset.reset': 'latest', + "max.poll.interval.ms": 360000, + "default.topic.config": { + "auto.offset.reset": "latest", }, } @@ -615,8 +647,9 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer: for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions), - file=sys.stderr) + print( + "Kafka partitions rebalanced: {} / {}".format(consumer, partitions), file=sys.stderr + ) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 |