aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py289
1 files changed, 161 insertions, 128 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index ceb6671..bd7f36a 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -10,8 +10,13 @@ from typing import Any, Dict, List, Optional, Sequence
import requests
from confluent_kafka import Consumer, KafkaException, Producer
-from .ia import (PetaboxError, SandcrawlerBackoffError, WaybackClient, WaybackContentError,
- WaybackError)
+from .ia import (
+ PetaboxError,
+ SandcrawlerBackoffError,
+ WaybackClient,
+ WaybackContentError,
+ WaybackError,
+)
from .misc import parse_cdx_line
@@ -22,25 +27,26 @@ class SandcrawlerWorker(object):
Usually these get "pushed" into by a RecordPusher. Output goes to another
worker (pipeline-style), or defaults to stdout.
"""
- def __init__(self, sink: Optional['SandcrawlerWorker'] = None):
+
+ def __init__(self, sink: Optional["SandcrawlerWorker"] = None):
self.counts: Counter = Counter()
self.sink: Optional[SandcrawlerWorker] = sink
def push_record(self, task: Any, key: Optional[str] = None) -> Any:
- self.counts['total'] += 1
+ self.counts["total"] += 1
if not self.want(task):
- self.counts['skip'] += 1
+ self.counts["skip"] += 1
return
result = self.process(task, key=key)
if not result:
- self.counts['failed'] += 1
+ self.counts["failed"] += 1
return
- elif type(result) == dict and 'status' in result and len(result['status']) < 32:
- self.counts[result['status']] += 1
+ elif type(result) == dict and "status" in result and len(result["status"]) < 32:
+ self.counts[result["status"]] += 1
if self.sink:
self.sink.push_record(result)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
else:
print(json.dumps(result))
return result
@@ -53,10 +59,9 @@ class SandcrawlerWorker(object):
"""
return None
- def push_record_timeout(self,
- task: Any,
- key: Optional[str] = None,
- timeout: int = 300) -> Any:
+ def push_record_timeout(
+ self, task: Any, key: Optional[str] = None, timeout: int = 300
+ ) -> Any:
"""
A wrapper around self.push_record which sets a timeout.
@@ -64,6 +69,7 @@ class SandcrawlerWorker(object):
multithreading or if signal-based timeouts are used elsewhere in the
same process.
"""
+
def timeout_handler(signum: int, frame: Any) -> None:
raise TimeoutError("timeout processing record")
@@ -73,12 +79,12 @@ class SandcrawlerWorker(object):
try:
resp = self.push_record(task, key=key)
except TimeoutError:
- self.counts['timeout'] += 1
+ self.counts["timeout"] += 1
resp = self.timeout_response(task) # pylint: disable=assignment-from-none
# TODO: what if it is this push_record() itself that is timing out?
if resp and self.sink:
self.sink.push_record(resp)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
elif resp:
print(json.dumps(resp))
finally:
@@ -109,7 +115,7 @@ class SandcrawlerWorker(object):
TODO: should derived workers explicitly type-check the 'task' object?
"""
- raise NotImplementedError('implementation required')
+ raise NotImplementedError("implementation required")
class SandcrawlerFetchWorker(SandcrawlerWorker):
@@ -117,25 +123,26 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
PDFs) from wayback, archive.org, or other sources.
"""
+
def __init__(self, wayback_client: Optional[WaybackClient], **kwargs):
super().__init__(**kwargs)
self.wayback_client = wayback_client
def fetch_blob(self, record: Dict[str, Any]) -> Dict[str, Any]:
- default_key = record['sha1hex']
+ default_key = record["sha1hex"]
wayback_sec = None
petabox_sec = None
- if record.get('warc_path') and record.get('warc_offset'):
+ if record.get("warc_path") and record.get("warc_offset"):
# it's a full CDX dict. fetch using WaybackClient
if not self.wayback_client:
raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
blob: bytes = self.wayback_client.fetch_petabox_body(
- csize=record['warc_csize'],
- offset=record['warc_offset'],
- warc_path=record['warc_path'],
+ csize=record["warc_csize"],
+ offset=record["warc_offset"],
+ warc_path=record["warc_path"],
)
wayback_sec = time.time() - start
except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we:
@@ -145,15 +152,15 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
status="error-wayback",
error_msg=str(we),
)
- elif record.get('url') and record.get('datetime'):
+ elif record.get("url") and record.get("datetime"):
# it's a partial CDX dict or something? fetch using WaybackClient
if not self.wayback_client:
raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
blob = self.wayback_client.fetch_replay_body(
- url=record['url'],
- datetime=record['datetime'],
+ url=record["url"],
+ datetime=record["datetime"],
)
wayback_sec = time.time() - start
except (WaybackError, WaybackContentError) as we:
@@ -163,11 +170,12 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
status="error-wayback",
error_msg=str(we),
)
- elif record.get('item') and record.get('path'):
+ elif record.get("item") and record.get("path"):
# it's petabox link; fetch via HTTP
start = time.time()
- ia_resp = requests.get("https://archive.org/serve/{}/{}".format(
- record['item'], record['path']))
+ ia_resp = requests.get(
+ "https://archive.org/serve/{}/{}".format(record["item"], record["path"])
+ )
petabox_sec = time.time() - start
try:
ia_resp.raise_for_status()
@@ -181,7 +189,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
blob = ia_resp.content
else:
raise ValueError(
- "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed"
+ )
if not blob:
return dict(
key=default_key,
@@ -201,29 +210,31 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
class MultiprocessWrapper(SandcrawlerWorker):
- def __init__(self,
- worker: SandcrawlerWorker,
- sink: Optional[SandcrawlerWorker] = None,
- jobs: Optional[int] = None):
+ def __init__(
+ self,
+ worker: SandcrawlerWorker,
+ sink: Optional[SandcrawlerWorker] = None,
+ jobs: Optional[int] = None,
+ ):
self.counts = Counter()
self.worker = worker
self.sink = sink
self.pool = multiprocessing.pool.Pool(jobs)
def push_batch(self, tasks: List[Any]) -> List[Any]:
- self.counts['total'] += len(tasks)
+ self.counts["total"] += len(tasks)
print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
results = self.pool.map(self.worker.process, tasks)
for result in results:
if not result:
- self.counts['failed'] += 1
+ self.counts["failed"] += 1
return []
- elif type(result) == dict and 'status' in result and len(result['status']) < 32:
- self.counts[result['status']] += 1
+ elif type(result) == dict and "status" in result and len(result["status"]) < 32:
+ self.counts[result["status"]] += 1
if self.sink:
self.sink.push_record(result)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
else:
print(json.dumps(result))
return results
@@ -243,6 +254,7 @@ class BlackholeSink(SandcrawlerWorker):
Useful for tests.
"""
+
def push_record(self, task: Any, key: Optional[str] = None) -> Any:
return
@@ -257,12 +269,14 @@ class KafkaSink(SandcrawlerWorker):
self.produce_topic = produce_topic
self.kafka_hosts = kafka_hosts
- config = self.producer_config({
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
- 'api.version.request': True,
- 'api.version.fallback.ms': 0,
- })
+ config = self.producer_config(
+ {
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 30000000, # ~30 MBytes; broker is ~50 MBytes
+ "api.version.request": True,
+ "api.version.fallback.ms": 0,
+ }
+ )
self.producer = Producer(config)
@staticmethod
@@ -275,27 +289,29 @@ class KafkaSink(SandcrawlerWorker):
def producer_config(self, kafka_config: dict) -> dict:
config = kafka_config.copy()
- config.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ config.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "message.timeout.ms": 30000,
+ "request.required.acks": -1, # all brokers must confirm
+ },
}
- })
+ )
return config
def push_record(self, msg: Any, key: Optional[str] = None) -> Any:
- self.counts['total'] += 1
+ self.counts["total"] += 1
if type(msg) == dict:
- if not key and 'key' in msg:
- key = msg['key']
+ if not key and "key" in msg:
+ key = msg["key"]
msg = json.dumps(msg)
if type(msg) == str:
- msg = msg.encode('utf-8')
+ msg = msg.encode("utf-8")
assert type(msg) == bytes
self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast)
- self.counts['produced'] += 1
+ self.counts["produced"] += 1
# check for errors etc
self.producer.poll(0)
@@ -314,19 +330,22 @@ class KafkaCompressSink(KafkaSink):
"""
Variant of KafkaSink for large documents. Used for, eg, GROBID output.
"""
+
def producer_config(self, kafka_config: Dict[str, Any]) -> Dict[str, Any]:
config = kafka_config.copy()
- config.update({
- 'compression.codec': 'gzip',
- 'retry.backoff.ms': 250,
- 'linger.ms': 1000,
- 'batch.num.messages': 50,
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ config.update(
+ {
+ "compression.codec": "gzip",
+ "retry.backoff.ms": 250,
+ "linger.ms": 1000,
+ "batch.num.messages": 50,
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "message.timeout.ms": 30000,
+ "request.required.acks": -1, # all brokers must confirm
+ },
}
- })
+ )
return config
@@ -335,6 +354,7 @@ class RecordPusher:
Base class for different record sources to be pushed into workers. Pretty
trivial interface, just wraps an importer and pushes records in to it.
"""
+
def __init__(self, worker: SandcrawlerWorker, **kwargs):
self.counts: Counter = Counter()
self.worker: SandcrawlerWorker = worker
@@ -356,7 +376,7 @@ class JsonLinePusher(RecordPusher):
self.counts = Counter()
self.worker = worker
self.json_file = json_file
- self.batch_size = kwargs.get('batch_size', None)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
@@ -365,24 +385,24 @@ class JsonLinePusher(RecordPusher):
for line in self.json_file:
if not line:
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
try:
record = json.loads(line)
except json.decoder.JSONDecodeError:
- self.counts['error-json-decode'] += 1
+ self.counts["error-json-decode"] += 1
continue
if self.batch_size:
batch.append(record)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(record)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
self.worker.finish()
print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
@@ -394,10 +414,10 @@ class CdxLinePusher(RecordPusher):
self.counts = Counter()
self.worker = worker
self.cdx_file = cdx_file
- self.filter_http_statuses = kwargs.get('filter_http_statuses', None)
- self.filter_mimetypes = kwargs.get('filter_mimetypes', None)
- self.allow_octet_stream = kwargs.get('allow_octet_stream', False)
- self.batch_size = kwargs.get('batch_size', None)
+ self.filter_http_statuses = kwargs.get("filter_http_statuses", None)
+ self.filter_mimetypes = kwargs.get("filter_mimetypes", None)
+ self.allow_octet_stream = kwargs.get("allow_octet_stream", False)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
@@ -406,30 +426,32 @@ class CdxLinePusher(RecordPusher):
for line in self.cdx_file:
if not line:
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
record = parse_cdx_line(line, normalize=True)
if not record:
- self.counts['skip-parse'] += 1
+ self.counts["skip-parse"] += 1
continue
- if self.filter_http_statuses and record[
- 'http_status'] not in self.filter_http_statuses:
- self.counts['skip-http_status'] += 1
+ if (
+ self.filter_http_statuses
+ and record["http_status"] not in self.filter_http_statuses
+ ):
+ self.counts["skip-http_status"] += 1
continue
- if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
- self.counts['skip-mimetype'] += 1
+ if self.filter_mimetypes and record["mimetype"] not in self.filter_mimetypes:
+ self.counts["skip-mimetype"] += 1
continue
if self.batch_size:
batch.append(record)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(record)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
self.worker.finish()
print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
@@ -442,33 +464,33 @@ class ZipfilePusher(RecordPusher):
self.worker = worker
self.filter_suffix = ".pdf"
self.zipfile_path = zipfile_path
- self.batch_size = kwargs.get('batch_size', None)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
def run(self) -> Counter:
batch = []
- with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
+ with zipfile.ZipFile(self.zipfile_path, "r") as archive:
for zipinfo in archive.infolist():
if not zipinfo.filename.endswith(self.filter_suffix):
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
# NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it
- flo = archive.open(zipinfo, 'r')
- data = flo.read(2**32)
+ flo = archive.open(zipinfo, "r")
+ data = flo.read(2 ** 32)
flo.close()
if self.batch_size:
batch.append(data)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(data)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
self.worker.finish()
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
@@ -476,8 +498,14 @@ class ZipfilePusher(RecordPusher):
class KafkaJsonPusher(RecordPusher):
- def __init__(self, worker: SandcrawlerWorker, kafka_hosts: str, consume_topic: str,
- group: str, **kwargs):
+ def __init__(
+ self,
+ worker: SandcrawlerWorker,
+ kafka_hosts: str,
+ consume_topic: str,
+ group: str,
+ **kwargs
+ ):
self.counts = Counter()
self.worker = worker
self.consumer = make_kafka_consumer(
@@ -485,14 +513,14 @@ class KafkaJsonPusher(RecordPusher):
consume_topic,
group,
)
- self.push_batches = kwargs.get('push_batches', False)
- self.raw_records = kwargs.get('raw_records', False)
- self.poll_interval = kwargs.get('poll_interval', 5.0)
- self.batch_size = kwargs.get('batch_size', 100)
+ self.push_batches = kwargs.get("push_batches", False)
+ self.raw_records = kwargs.get("raw_records", False)
+ self.poll_interval = kwargs.get("poll_interval", 5.0)
+ self.batch_size = kwargs.get("batch_size", 100)
if self.batch_size in (0, 1):
self.batch_size = 1
- self.batch_worker = kwargs.get('batch_worker', False)
- self.process_timeout_sec = kwargs.get('process_timeout_sec', 300)
+ self.batch_worker = kwargs.get("batch_worker", False)
+ self.process_timeout_sec = kwargs.get("process_timeout_sec", 300)
def run(self) -> Counter:
while True:
@@ -502,11 +530,15 @@ class KafkaJsonPusher(RecordPusher):
# case where there there is one update and thousands of creates;
# update would be lingering in worker, and if worker crashed
# never created. Not great.
- batch = self.consumer.consume(num_messages=self.batch_size,
- timeout=self.poll_interval)
- print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval),
- file=sys.stderr)
+ batch = self.consumer.consume(
+ num_messages=self.batch_size, timeout=self.poll_interval
+ )
+ print(
+ "... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval
+ ),
+ file=sys.stderr,
+ )
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -518,14 +550,14 @@ class KafkaJsonPusher(RecordPusher):
raise KafkaException(msg.error())
# ... then process
if self.push_batches:
- self.counts['total'] += len(batch)
- records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
+ self.counts["total"] += len(batch)
+ records = [json.loads(msg.value().decode("utf-8")) for msg in batch]
self.worker.push_batch(records)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
else:
for msg in batch:
- self.counts['total'] += 1
+ self.counts["total"] += 1
if self.raw_records:
# In this mode, pass the Kafka message as bytes through
# without decoding as JSON. Eg, for thumbnails (where
@@ -533,7 +565,7 @@ class KafkaJsonPusher(RecordPusher):
# from the message)
record = msg.value()
else:
- record = json.loads(msg.value().decode('utf-8'))
+ record = json.loads(msg.value().decode("utf-8"))
# This complex bit of code implements backoff/backpressure
# in a way that will not cause this Kafka consumer to lose
# partition assignments (resulting in a rebalance). This
@@ -543,9 +575,9 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record,
- key=msg.key(),
- timeout=self.process_timeout_sec)
+ self.worker.push_record_timeout(
+ record, key=msg.key(), timeout=self.process_timeout_sec
+ )
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
@@ -557,8 +589,8 @@ class KafkaJsonPusher(RecordPusher):
assert not empty_batch
time.sleep(5)
self.consumer.resume(self.consumer.assignment())
- self.counts['pushed'] += 1
- if self.counts['total'] % 500 == 0:
+ self.counts["pushed"] += 1
+ if self.counts["total"] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
for msg in batch:
# locally store offsets of processed messages; will be
@@ -589,25 +621,25 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer:
print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(p.error)
- #print("Kafka consumer commit successful")
+ # print("Kafka consumer commit successful")
pass
# previously, using pykafka
- #auto_commit_enable=True,
- #auto_commit_interval_ms=30000, # 30 seconds
+ # auto_commit_enable=True,
+ # auto_commit_interval_ms=30000, # 30 seconds
conf = {
- 'bootstrap.servers': hosts,
- 'group.id': group,
- 'on_commit': fail_fast,
+ "bootstrap.servers": hosts,
+ "group.id": group,
+ "on_commit": fail_fast,
# messages don't have offset marked as stored until processed,
# but we do auto-commit stored offsets to broker
- 'enable.auto.offset.store': False,
- 'enable.auto.commit': True,
+ "enable.auto.offset.store": False,
+ "enable.auto.commit": True,
# user code timeout; if no poll after this long, assume user code
# hung and rebalance (default: 6min)
- 'max.poll.interval.ms': 360000,
- 'default.topic.config': {
- 'auto.offset.reset': 'latest',
+ "max.poll.interval.ms": 360000,
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
},
}
@@ -615,8 +647,9 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer:
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions),
- file=sys.stderr)
+ print(
+ "Kafka partitions rebalanced: {} / {}".format(consumer, partitions), file=sys.stderr
+ )
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8