aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py625
1 files changed, 625 insertions, 0 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
new file mode 100644
index 0000000..37e3d7a
--- /dev/null
+++ b/python/sandcrawler/workers.py
@@ -0,0 +1,625 @@
+
+import sys
+import json
+import time
+import signal
+import zipfile
+import requests
+import multiprocessing.pool
+from collections import Counter
+from confluent_kafka import Consumer, Producer, KafkaException
+
+from .misc import parse_cdx_line
+from .ia import SandcrawlerBackoffError, WaybackError, WaybackContentError, PetaboxError
+
+
+class SandcrawlerWorker(object):
+ """
+ Base class for sandcrawler workers.
+
+ Usually these get "pushed" into by a RecordPusher. Output goes to another
+ worker (pipeline-style), or defaults to stdout.
+ """
+
+ def __init__(self):
+ self.counts = Counter()
+ self.sink = None
+ # TODO: self.counters
+
+ def push_record(self, task, key=None):
+ self.counts['total'] += 1
+ if not self.want(task):
+ self.counts['skip'] += 1
+ return
+ result = self.process(task, key=key)
+ if not result:
+ self.counts['failed'] += 1
+ return
+ elif type(result) == dict and 'status' in result and len(result['status']) < 32:
+ self.counts[result['status']] += 1
+
+ if self.sink:
+ self.sink.push_record(result)
+ self.counts['pushed'] += 1
+ else:
+ print(json.dumps(result))
+ return result
+
+ def timeout_response(self, task):
+ """
+ This should be overridden by workers that want to return something
+ meaningful when there is a processing timeout. Eg, JSON vs some other
+ error message.
+ """
+ return None
+
+ def push_record_timeout(self, task, key=None, timeout=300):
+ """
+ A wrapper around self.push_record which sets a timeout.
+
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
+ """
+
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task, key=key)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
+
+ def push_batch(self, tasks):
+ results = []
+ for task in tasks:
+ results.append(self.push_record(task))
+ return results
+
+ def finish(self):
+ if self.sink:
+ self.sink.finish()
+ print("Worker: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+ def want(self, task):
+ """
+ Optionally override this as a filter in implementations.
+ """
+ return True
+
+ def process(self, task, key=None):
+ """
+ Derived workers need to implement business logic here.
+ """
+ raise NotImplementedError('implementation required')
+
+
+class SandcrawlerFetchWorker(SandcrawlerWorker):
+ """
+ Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
+ PDFs) from wayback, archive.org, or other sources.
+ """
+
+ def __init__(self, wayback_client, **kwargs):
+ super().__init__(**kwargs)
+ self.wayback_client = wayback_client
+
+ def fetch_blob(self, record):
+ start_process = time.time()
+ default_key = record['sha1hex']
+ wayback_sec = None
+ petabox_sec = None
+
+ if record.get('warc_path') and record.get('warc_offset'):
+ # it's a full CDX dict. fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_petabox_body(
+ csize=record['warc_csize'],
+ offset=record['warc_offset'],
+ warc_path=record['warc_path'],
+ )
+ wayback_sec = time.time() - start
+ except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we:
+ return dict(
+ key=default_key,
+ source=record,
+ status="error-wayback",
+ error_msg=str(we),
+ )
+ elif record.get('url') and record.get('datetime'):
+ # it's a partial CDX dict or something? fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_replay_body(
+ url=record['url'],
+ datetime=record['datetime'],
+ )
+ wayback_sec = time.time() - start
+ except (WaybackError, WaybackContentError) as we:
+ return dict(
+ key=default_key,
+ source=record,
+ status="error-wayback",
+ error_msg=str(we),
+ )
+ elif record.get('item') and record.get('path'):
+ # it's petabox link; fetch via HTTP
+ start = time.time()
+ resp = requests.get("https://archive.org/serve/{}/{}".format(
+ record['item'], record['path']))
+ petabox_sec = time.time() - start
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ return dict(
+ key=default_key,
+ source=record,
+ status="error-petabox",
+ error_msg=str(e),
+ )
+ blob = resp.content
+ else:
+ raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ if not blob:
+ return dict(
+ key=default_key,
+ source=record,
+ status="empty-blob",
+ )
+ return dict(
+ key=default_key,
+ status="success",
+ source=record,
+ blob=blob,
+ )
+
+class MultiprocessWrapper(SandcrawlerWorker):
+
+ def __init__(self, worker, sink, jobs=None):
+ self.counts = Counter()
+ self.worker = worker
+ self.sink = sink
+ self.pool = multiprocessing.pool.Pool(jobs)
+
+ def push_batch(self, tasks):
+ self.counts['total'] += len(tasks)
+ print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
+ results = self.pool.map(self.worker.process, tasks)
+ for result in results:
+ if not result:
+ self.counts['failed'] += 1
+ return
+ elif type(result) == dict and 'status' in result and len(result['status']) < 32:
+ self.counts[result['status']] += 1
+
+ if self.sink:
+ self.sink.push_record(result)
+ self.counts['pushed'] += 1
+ else:
+ print(json.dumps(result))
+ return results
+
+ def finish(self):
+ self.pool.terminate()
+ if self.sink:
+ self.sink.finish()
+ worker_counts = self.worker.finish()
+ print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
+ return worker_counts
+
+class BlackholeSink(SandcrawlerWorker):
+ """
+ Dummy SandcrawlerWorker. That doesn't do or process anything.
+
+ Useful for tests.
+ """
+
+ def push_record(self, task, key=None):
+ return
+
+ def push_batch(self, tasks):
+ return
+
+class KafkaSink(SandcrawlerWorker):
+
+ def __init__(self, kafka_hosts, produce_topic, **kwargs):
+ self.sink = None
+ self.counts = Counter()
+ self.produce_topic = produce_topic
+ self.kafka_hosts = kafka_hosts
+
+ config = self.producer_config({
+ 'bootstrap.servers': kafka_hosts,
+ 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
+ 'api.version.request': True,
+ 'api.version.fallback.ms': 0,
+ })
+ self.producer = Producer(config)
+
+
+ @staticmethod
+ def _fail_fast(err, msg):
+ if err is not None:
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+
+ def producer_config(self, kafka_config):
+ config = kafka_config.copy()
+ config.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'message.timeout.ms': 30000,
+ 'request.required.acks': -1, # all brokers must confirm
+ }
+ })
+ return config
+
+ def push_record(self, msg, key=None):
+ self.counts['total'] += 1
+ if type(msg) == dict:
+ if not key and 'key' in msg:
+ key = msg['key']
+ msg = json.dumps(msg)
+ if type(msg) == str:
+ msg = msg.encode('utf-8')
+ assert type(msg) == bytes
+
+ self.producer.produce(
+ self.produce_topic,
+ msg,
+ key=key,
+ on_delivery=self._fail_fast)
+ self.counts['produced'] += 1
+
+ # check for errors etc
+ self.producer.poll(0)
+
+ def push_batch(self, msgs):
+ for m in msgs:
+ self.push_record(m)
+
+ def finish(self):
+ self.producer.flush()
+ return self.counts
+
+
+class KafkaCompressSink(KafkaSink):
+ """
+ Variant of KafkaSink for large documents. Used for, eg, GROBID output.
+ """
+
+ def producer_config(self, kafka_config):
+ config = kafka_config.copy()
+ config.update({
+ 'compression.codec': 'gzip',
+ 'retry.backoff.ms': 250,
+ 'linger.ms': 1000,
+ 'batch.num.messages': 50,
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'message.timeout.ms': 30000,
+ 'request.required.acks': -1, # all brokers must confirm
+ }
+ })
+ return config
+
+
+class RecordPusher:
+ """
+ Base class for different record sources to be pushed into workers. Pretty
+ trivial interface, just wraps an importer and pushes records in to it.
+ """
+
+ def __init__(self, worker, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+
+ def run(self):
+ """
+ This will look something like:
+
+ for line in sys.stdin:
+ record = json.loads(line)
+ self.worker.push_record(record)
+ print(self.worker.finish())
+ """
+ raise NotImplementedError
+
+
+class JsonLinePusher(RecordPusher):
+
+ def __init__(self, worker, json_file, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.json_file = json_file
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
+
+ def run(self):
+ batch = []
+ for line in self.json_file:
+ if not line:
+ continue
+ self.counts['total'] += 1
+ try:
+ record = json.loads(line)
+ except json.decoder.JSONDecodeError:
+ self.counts['error-json-decode'] += 1
+ continue
+ if self.batch_size:
+ batch.append(record)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(record)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+
+class CdxLinePusher(RecordPusher):
+
+ def __init__(self, worker, cdx_file, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.cdx_file = cdx_file
+ self.filter_http_statuses = kwargs.get('filter_http_statuses', None)
+ self.filter_mimetypes = kwargs.get('filter_mimetypes', None)
+ self.allow_octet_stream = kwargs.get('allow_octet_stream', False)
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
+
+ def run(self):
+ batch = []
+ for line in self.cdx_file:
+ if not line:
+ continue
+ self.counts['total'] += 1
+ record = parse_cdx_line(line, normalize=True)
+ if not record:
+ self.counts['skip-parse'] += 1
+ continue
+ if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses:
+ self.counts['skip-http_status'] += 1
+ continue
+ if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
+ self.counts['skip-mimetype'] += 1
+ continue
+ if self.batch_size:
+ batch.append(record)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(record)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+
+class ZipfilePusher(RecordPusher):
+
+ def __init__(self, worker, zipfile_path, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.filter_suffix = ".pdf"
+ self.zipfile_path = zipfile_path
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
+
+ def run(self):
+ batch = []
+ with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
+ for zipinfo in archive.infolist():
+ if not zipinfo.filename.endswith(self.filter_suffix):
+ continue
+ self.counts['total'] += 1
+ # NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it
+ flo = archive.open(zipinfo, 'r')
+ data = flo.read(2**32)
+ flo.close()
+ if self.batch_size:
+ batch.append(data)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(data)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+class KafkaJsonPusher(RecordPusher):
+
+ def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.consumer = make_kafka_consumer(
+ kafka_hosts,
+ consume_topic,
+ group,
+ )
+ self.push_batches = kwargs.get('push_batches', False)
+ self.raw_records = kwargs.get('raw_records', False)
+ self.poll_interval = kwargs.get('poll_interval', 5.0)
+ self.batch_size = kwargs.get('batch_size', 100)
+ if self.batch_size in (0, 1):
+ self.batch_size = 1
+ self.batch_worker = kwargs.get('batch_worker', False)
+ self.process_timeout_sec = kwargs.get('process_timeout_sec', 300)
+
+ def run(self):
+ while True:
+ # TODO: this is batch-oriented, because underlying worker is
+ # often batch-oriented, but this doesn't confirm that entire batch
+ # has been pushed to fatcat before commiting offset. Eg, consider
+ # case where there there is one update and thousands of creates;
+ # update would be lingering in worker, and if worker crashed
+ # never created. Not great.
+ batch = self.consumer.consume(
+ num_messages=self.batch_size,
+ timeout=self.poll_interval)
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval),
+ file=sys.stderr)
+ if not batch:
+ # TODO: could have some larger timeout here and
+ # self.worker.finish() if it's been more than, eg, a couple
+ # minutes
+ continue
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+ # ... then process
+ if self.push_batches:
+ self.counts['total'] += len(batch)
+ records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
+ self.worker.push_batch(records)
+ self.counts['pushed'] += len(batch)
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
+ else:
+ for msg in batch:
+ self.counts['total'] += 1
+ if self.raw_records:
+ # In this mode, pass the Kafka message as bytes through
+ # without decoding as JSON. Eg, for thumbnails (where
+ # message bytes are JPEG, and we need # the sha1hex key
+ # from the message)
+ record = msg.value()
+ else:
+ record = json.loads(msg.value().decode('utf-8'))
+ # This complex bit of code implements backoff/backpressure
+ # in a way that will not cause this Kafka consumer to lose
+ # partition assignments (resulting in a rebalance). This
+ # was needed for the ingest workers. There is probably a
+ # better way to structure this concurrency.
+ done = False
+ while not done:
+ try:
+ # use timeouts; don't want kafka itself to timeout
+ self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec)
+ break
+ except SandcrawlerBackoffError as be:
+ print("Backing off for 200 seconds: {}".format(be))
+ self.consumer.pause(self.consumer.assignment())
+ for i in range(40):
+ # Beware this poll which should not be
+ # receiving any messages because we are paused!
+ empty_batch = self.consumer.poll(0)
+ assert not empty_batch
+ time.sleep(5)
+ self.consumer.resume(self.consumer.assignment())
+ self.counts['pushed'] += 1
+ if self.counts['total'] % 500 == 0:
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
+ for msg in batch:
+ # locally store offsets of processed messages; will be
+ # auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
+ # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
+ # commit the current batch if it has been lingering
+ worker_counts = self.worker.finish()
+ print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)
+ self.consumer.close()
+ return self.counts
+
+
+def make_kafka_consumer(hosts, consume_topic, group):
+ topic_name = consume_topic
+
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(p.error)
+ #print("Kafka consumer commit successful")
+ pass
+
+ # previously, using pykafka
+ #auto_commit_enable=True,
+ #auto_commit_interval_ms=30000, # 30 seconds
+ conf = {
+ 'bootstrap.servers': hosts,
+ 'group.id': group,
+ 'on_commit': fail_fast,
+ # messages don't have offset marked as stored until processed,
+ # but we do auto-commit stored offsets to broker
+ 'enable.auto.offset.store': False,
+ 'enable.auto.commit': True,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 6min)
+ 'max.poll.interval.ms': 360000,
+ 'default.topic.config': {
+ 'auto.offset.reset': 'latest',
+ },
+ }
+
+ def on_rebalance(consumer, partitions):
+ for p in partitions:
+ if p.error:
+ raise KafkaException(p.error)
+ print("Kafka partitions rebalanced: {} / {}".format(
+ consumer, partitions),
+ file=sys.stderr)
+
+ consumer = Consumer(conf)
+ # NOTE: it's actually important that topic_name *not* be bytes (UTF-8
+ # encoded)
+ consumer.subscribe([topic_name],
+ on_assign=on_rebalance,
+ on_revoke=on_rebalance,
+ )
+ print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr)
+ return consumer