aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py60
1 files changed, 25 insertions, 35 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d8a4016..7135f4c 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,4 +1,3 @@
-
import json
import multiprocessing.pool
import signal
@@ -21,7 +20,6 @@ class SandcrawlerWorker(object):
Usually these get "pushed" into by a RecordPusher. Output goes to another
worker (pipeline-style), or defaults to stdout.
"""
-
def __init__(self):
self.counts = Counter()
self.sink = None
@@ -62,9 +60,9 @@ class SandcrawlerWorker(object):
multithreading or if signal-based timeouts are used elsewhere in the
same process.
"""
-
def timeout_handler(signum, frame):
raise TimeoutError("timeout processing record")
+
signal.signal(signal.SIGALRM, timeout_handler)
resp = None
signal.alarm(int(timeout))
@@ -72,7 +70,7 @@ class SandcrawlerWorker(object):
resp = self.push_record(task, key=key)
except TimeoutError:
self.counts['timeout'] += 1
- resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
# TODO: what if it is this push_record() itself that is timing out?
if resp and self.sink:
self.sink.push_record(resp)
@@ -113,7 +111,6 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
PDFs) from wayback, archive.org, or other sources.
"""
-
def __init__(self, wayback_client, **kwargs):
super().__init__(**kwargs)
self.wayback_client = wayback_client
@@ -178,7 +175,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
)
blob = resp.content
else:
- raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ raise ValueError(
+ "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
if not blob:
return dict(
key=default_key,
@@ -192,8 +190,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
blob=blob,
)
-class MultiprocessWrapper(SandcrawlerWorker):
+class MultiprocessWrapper(SandcrawlerWorker):
def __init__(self, worker, sink, jobs=None):
self.counts = Counter()
self.worker = worker
@@ -226,21 +224,21 @@ class MultiprocessWrapper(SandcrawlerWorker):
print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
return worker_counts
+
class BlackholeSink(SandcrawlerWorker):
"""
Dummy SandcrawlerWorker. That doesn't do or process anything.
Useful for tests.
"""
-
def push_record(self, task, key=None):
return
def push_batch(self, tasks):
return
-class KafkaSink(SandcrawlerWorker):
+class KafkaSink(SandcrawlerWorker):
def __init__(self, kafka_hosts, produce_topic, **kwargs):
self.sink = None
self.counts = Counter()
@@ -249,13 +247,12 @@ class KafkaSink(SandcrawlerWorker):
config = self.producer_config({
'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
+ 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
'api.version.request': True,
'api.version.fallback.ms': 0,
})
self.producer = Producer(config)
-
@staticmethod
def _fail_fast(err, msg):
if err is not None:
@@ -270,7 +267,7 @@ class KafkaSink(SandcrawlerWorker):
'delivery.report.only.error': True,
'default.topic.config': {
'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ 'request.required.acks': -1, # all brokers must confirm
}
})
return config
@@ -285,11 +282,7 @@ class KafkaSink(SandcrawlerWorker):
msg = msg.encode('utf-8')
assert type(msg) == bytes
- self.producer.produce(
- self.produce_topic,
- msg,
- key=key,
- on_delivery=self._fail_fast)
+ self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast)
self.counts['produced'] += 1
# check for errors etc
@@ -308,7 +301,6 @@ class KafkaCompressSink(KafkaSink):
"""
Variant of KafkaSink for large documents. Used for, eg, GROBID output.
"""
-
def producer_config(self, kafka_config):
config = kafka_config.copy()
config.update({
@@ -319,7 +311,7 @@ class KafkaCompressSink(KafkaSink):
'delivery.report.only.error': True,
'default.topic.config': {
'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ 'request.required.acks': -1, # all brokers must confirm
}
})
return config
@@ -330,7 +322,6 @@ class RecordPusher:
Base class for different record sources to be pushed into workers. Pretty
trivial interface, just wraps an importer and pushes records in to it.
"""
-
def __init__(self, worker, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -348,7 +339,6 @@ class RecordPusher:
class JsonLinePusher(RecordPusher):
-
def __init__(self, worker, json_file, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -387,7 +377,6 @@ class JsonLinePusher(RecordPusher):
class CdxLinePusher(RecordPusher):
-
def __init__(self, worker, cdx_file, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -409,7 +398,8 @@ class CdxLinePusher(RecordPusher):
if not record:
self.counts['skip-parse'] += 1
continue
- if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses:
+ if self.filter_http_statuses and record[
+ 'http_status'] not in self.filter_http_statuses:
self.counts['skip-http_status'] += 1
continue
if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
@@ -434,7 +424,6 @@ class CdxLinePusher(RecordPusher):
class ZipfilePusher(RecordPusher):
-
def __init__(self, worker, zipfile_path, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -472,8 +461,8 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-class KafkaJsonPusher(RecordPusher):
+class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -499,12 +488,11 @@ class KafkaJsonPusher(RecordPusher):
# case where there there is one update and thousands of creates;
# update would be lingering in worker, and if worker crashed
# never created. Not great.
- batch = self.consumer.consume(
- num_messages=self.batch_size,
- timeout=self.poll_interval)
+ batch = self.consumer.consume(num_messages=self.batch_size,
+ timeout=self.poll_interval)
print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval),
- file=sys.stderr)
+ len(batch), self.poll_interval),
+ file=sys.stderr)
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -541,7 +529,9 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec)
+ self.worker.push_record_timeout(record,
+ key=msg.key(),
+ timeout=self.process_timeout_sec)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
@@ -611,14 +601,14 @@ def make_kafka_consumer(hosts, consume_topic, group):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions),
- file=sys.stderr)
+ print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions),
+ file=sys.stderr)
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8
# encoded)
- consumer.subscribe([topic_name],
+ consumer.subscribe(
+ [topic_name],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)