aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/workers.py103
1 files changed, 57 insertions, 46 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 6b08f03..1b132ed 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -5,11 +5,13 @@ import sys
import time
import zipfile
from collections import Counter
+from typing import Any, Dict, List, Optional, Sequence
import requests
from confluent_kafka import Consumer, KafkaException, Producer
-from .ia import PetaboxError, SandcrawlerBackoffError, WaybackContentError, WaybackError
+from .ia import (PetaboxError, SandcrawlerBackoffError, WaybackClient, WaybackContentError,
+ WaybackError)
from .misc import parse_cdx_line
@@ -20,12 +22,11 @@ class SandcrawlerWorker(object):
Usually these get "pushed" into by a RecordPusher. Output goes to another
worker (pipeline-style), or defaults to stdout.
"""
- def __init__(self):
- self.counts = Counter()
- self.sink = None
- # TODO: self.counters
+ def __init__(self, sink: Optional['SandcrawlerWorker'] = None):
+ self.counts: Counter = Counter()
+ self.sink: Optional[SandcrawlerWorker] = sink
- def push_record(self, task, key=None):
+ def push_record(self, task: Any, key: Optional[str] = None) -> Any:
self.counts['total'] += 1
if not self.want(task):
self.counts['skip'] += 1
@@ -44,7 +45,7 @@ class SandcrawlerWorker(object):
print(json.dumps(result))
return result
- def timeout_response(self, task):
+ def timeout_response(self, task: Any) -> Any:
"""
This should be overridden by workers that want to return something
meaningful when there is a processing timeout. Eg, JSON vs some other
@@ -52,7 +53,10 @@ class SandcrawlerWorker(object):
"""
return None
- def push_record_timeout(self, task, key=None, timeout=300):
+ def push_record_timeout(self,
+ task: Any,
+ key: Optional[str] = None,
+ timeout: int = 300) -> Any:
"""
A wrapper around self.push_record which sets a timeout.
@@ -60,7 +64,7 @@ class SandcrawlerWorker(object):
multithreading or if signal-based timeouts are used elsewhere in the
same process.
"""
- def timeout_handler(signum, frame):
+ def timeout_handler(signum: int, frame: Any) -> None:
raise TimeoutError("timeout processing record")
signal.signal(signal.SIGALRM, timeout_handler)
@@ -81,27 +85,29 @@ class SandcrawlerWorker(object):
signal.alarm(0)
return resp
- def push_batch(self, tasks):
+ def push_batch(self, tasks: List[Any]) -> List[Any]:
results = []
for task in tasks:
results.append(self.push_record(task))
return results
- def finish(self):
+ def finish(self) -> Counter:
if self.sink:
self.sink.finish()
print("Worker: {}".format(self.counts), file=sys.stderr)
return self.counts
- def want(self, task):
+ def want(self, task: Any) -> bool:
"""
Optionally override this as a filter in implementations.
"""
return True
- def process(self, task, key=None):
+ def process(self, task: Any, key: str = None) -> Any:
"""
Derived workers need to implement business logic here.
+
+ TODO: should derived workers explicitly type-check the 'task' object?
"""
raise NotImplementedError('implementation required')
@@ -111,11 +117,11 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
PDFs) from wayback, archive.org, or other sources.
"""
- def __init__(self, wayback_client, **kwargs):
+ def __init__(self, wayback_client: WaybackClient, **kwargs):
super().__init__(**kwargs)
self.wayback_client = wayback_client
- def fetch_blob(self, record):
+ def fetch_blob(self, record: Dict[str, Any]) -> Dict[str, Any]:
default_key = record['sha1hex']
wayback_sec = None
petabox_sec = None
@@ -123,7 +129,7 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
if record.get('warc_path') and record.get('warc_offset'):
# it's a full CDX dict. fetch using WaybackClient
if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
+ raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
blob = self.wayback_client.fetch_petabox_body(
@@ -142,7 +148,7 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
elif record.get('url') and record.get('datetime'):
# it's a partial CDX dict or something? fetch using WaybackClient
if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
+ raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
blob = self.wayback_client.fetch_replay_body(
@@ -195,20 +201,23 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
class MultiprocessWrapper(SandcrawlerWorker):
- def __init__(self, worker, sink, jobs=None):
+ def __init__(self,
+ worker: SandcrawlerWorker,
+ sink: Optional[SandcrawlerWorker] = None,
+ jobs: Optional[int] = None):
self.counts = Counter()
self.worker = worker
self.sink = sink
self.pool = multiprocessing.pool.Pool(jobs)
- def push_batch(self, tasks):
+ def push_batch(self, tasks: List[Any]) -> List[Any]:
self.counts['total'] += len(tasks)
print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
results = self.pool.map(self.worker.process, tasks)
for result in results:
if not result:
self.counts['failed'] += 1
- return
+ return []
elif type(result) == dict and 'status' in result and len(result['status']) < 32:
self.counts[result['status']] += 1
@@ -219,7 +228,7 @@ class MultiprocessWrapper(SandcrawlerWorker):
print(json.dumps(result))
return results
- def finish(self):
+ def finish(self) -> Counter:
self.pool.terminate()
if self.sink:
self.sink.finish()
@@ -234,15 +243,15 @@ class BlackholeSink(SandcrawlerWorker):
Useful for tests.
"""
- def push_record(self, task, key=None):
+ def push_record(self, task: Any, key: Optional[str] = None) -> Any:
return
- def push_batch(self, tasks):
- return
+ def push_batch(self, tasks: List[Any]) -> List[Any]:
+ return []
class KafkaSink(SandcrawlerWorker):
- def __init__(self, kafka_hosts, produce_topic, **kwargs):
+ def __init__(self, kafka_hosts: str, produce_topic: str, **kwargs):
self.sink = None
self.counts = Counter()
self.produce_topic = produce_topic
@@ -257,14 +266,14 @@ class KafkaSink(SandcrawlerWorker):
self.producer = Producer(config)
@staticmethod
- def _fail_fast(err, msg):
+ def _fail_fast(err: Any, msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- def producer_config(self, kafka_config):
+ def producer_config(self, kafka_config: dict) -> dict:
config = kafka_config.copy()
config.update({
'delivery.report.only.error': True,
@@ -275,7 +284,7 @@ class KafkaSink(SandcrawlerWorker):
})
return config
- def push_record(self, msg, key=None):
+ def push_record(self, msg: Any, key: Optional[str] = None) -> Any:
self.counts['total'] += 1
if type(msg) == dict:
if not key and 'key' in msg:
@@ -291,11 +300,12 @@ class KafkaSink(SandcrawlerWorker):
# check for errors etc
self.producer.poll(0)
- def push_batch(self, msgs):
+ def push_batch(self, msgs: List[Any]) -> List[Any]:
for m in msgs:
self.push_record(m)
+ return []
- def finish(self):
+ def finish(self) -> Counter:
self.producer.flush()
return self.counts
@@ -304,7 +314,7 @@ class KafkaCompressSink(KafkaSink):
"""
Variant of KafkaSink for large documents. Used for, eg, GROBID output.
"""
- def producer_config(self, kafka_config):
+ def producer_config(self, kafka_config: Dict[str, Any]) -> Dict[str, Any]:
config = kafka_config.copy()
config.update({
'compression.codec': 'gzip',
@@ -325,11 +335,11 @@ class RecordPusher:
Base class for different record sources to be pushed into workers. Pretty
trivial interface, just wraps an importer and pushes records in to it.
"""
- def __init__(self, worker, **kwargs):
- self.counts = Counter()
- self.worker = worker
+ def __init__(self, worker: SandcrawlerWorker, **kwargs):
+ self.counts: Counter = Counter()
+ self.worker: SandcrawlerWorker = worker
- def run(self):
+ def run(self) -> Counter:
"""
This will look something like:
@@ -342,7 +352,7 @@ class RecordPusher:
class JsonLinePusher(RecordPusher):
- def __init__(self, worker, json_file, **kwargs):
+ def __init__(self, worker: SandcrawlerWorker, json_file: Sequence, **kwargs):
self.counts = Counter()
self.worker = worker
self.json_file = json_file
@@ -350,7 +360,7 @@ class JsonLinePusher(RecordPusher):
if self.batch_size in (0, 1):
self.batch_size = None
- def run(self):
+ def run(self) -> Counter:
batch = []
for line in self.json_file:
if not line:
@@ -380,7 +390,7 @@ class JsonLinePusher(RecordPusher):
class CdxLinePusher(RecordPusher):
- def __init__(self, worker, cdx_file, **kwargs):
+ def __init__(self, worker: SandcrawlerWorker, cdx_file: Sequence, **kwargs):
self.counts = Counter()
self.worker = worker
self.cdx_file = cdx_file
@@ -391,7 +401,7 @@ class CdxLinePusher(RecordPusher):
if self.batch_size in (0, 1):
self.batch_size = None
- def run(self):
+ def run(self) -> Counter:
batch = []
for line in self.cdx_file:
if not line:
@@ -427,7 +437,7 @@ class CdxLinePusher(RecordPusher):
class ZipfilePusher(RecordPusher):
- def __init__(self, worker, zipfile_path, **kwargs):
+ def __init__(self, worker: SandcrawlerWorker, zipfile_path: str, **kwargs):
self.counts = Counter()
self.worker = worker
self.filter_suffix = ".pdf"
@@ -436,7 +446,7 @@ class ZipfilePusher(RecordPusher):
if self.batch_size in (0, 1):
self.batch_size = None
- def run(self):
+ def run(self) -> Counter:
batch = []
with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
for zipinfo in archive.infolist():
@@ -466,7 +476,8 @@ class ZipfilePusher(RecordPusher):
class KafkaJsonPusher(RecordPusher):
- def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
+ def __init__(self, worker: SandcrawlerWorker, kafka_hosts: str, consume_topic: str,
+ group: str, **kwargs):
self.counts = Counter()
self.worker = worker
self.consumer = make_kafka_consumer(
@@ -483,7 +494,7 @@ class KafkaJsonPusher(RecordPusher):
self.batch_worker = kwargs.get('batch_worker', False)
self.process_timeout_sec = kwargs.get('process_timeout_sec', 300)
- def run(self):
+ def run(self) -> Counter:
while True:
# TODO: this is batch-oriented, because underlying worker is
# often batch-oriented, but this doesn't confirm that entire batch
@@ -562,10 +573,10 @@ class KafkaJsonPusher(RecordPusher):
return self.counts
-def make_kafka_consumer(hosts, consume_topic, group):
+def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer:
topic_name = consume_topic
- def fail_fast(err, partitions):
+ def fail_fast(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -600,7 +611,7 @@ def make_kafka_consumer(hosts, consume_topic, group):
},
}
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Any, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)