aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py418
1 files changed, 228 insertions, 190 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 37e3d7a..356f050 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,16 +1,22 @@
-
-import sys
import json
-import time
+import multiprocessing.pool
import signal
+import sys
+import time
import zipfile
-import requests
-import multiprocessing.pool
from collections import Counter
-from confluent_kafka import Consumer, Producer, KafkaException
+from typing import Any, Dict, List, Optional, Sequence
-from .misc import parse_cdx_line
-from .ia import SandcrawlerBackoffError, WaybackError, WaybackContentError, PetaboxError
+from confluent_kafka import Consumer, KafkaException, Producer
+
+from .ia import (
+ PetaboxError,
+ SandcrawlerBackoffError,
+ WaybackClient,
+ WaybackContentError,
+ WaybackError,
+)
+from .misc import parse_cdx_line, requests_retry_session
class SandcrawlerWorker(object):
@@ -21,31 +27,30 @@ class SandcrawlerWorker(object):
worker (pipeline-style), or defaults to stdout.
"""
- def __init__(self):
- self.counts = Counter()
- self.sink = None
- # TODO: self.counters
+ def __init__(self, sink: Optional["SandcrawlerWorker"] = None):
+ self.counts: Counter = Counter()
+ self.sink: Optional[SandcrawlerWorker] = sink
- def push_record(self, task, key=None):
- self.counts['total'] += 1
+ def push_record(self, task: Any, key: Optional[str] = None) -> Any:
+ self.counts["total"] += 1
if not self.want(task):
- self.counts['skip'] += 1
+ self.counts["skip"] += 1
return
result = self.process(task, key=key)
if not result:
- self.counts['failed'] += 1
+ self.counts["failed"] += 1
return
- elif type(result) == dict and 'status' in result and len(result['status']) < 32:
- self.counts[result['status']] += 1
+ elif type(result) == dict and "status" in result and len(result["status"]) < 32:
+ self.counts[result["status"]] += 1
if self.sink:
self.sink.push_record(result)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
else:
print(json.dumps(result))
return result
- def timeout_response(self, task):
+ def timeout_response(self, task: Any) -> Any:
"""
This should be overridden by workers that want to return something
meaningful when there is a processing timeout. Eg, JSON vs some other
@@ -53,7 +58,9 @@ class SandcrawlerWorker(object):
"""
return None
- def push_record_timeout(self, task, key=None, timeout=300):
+ def push_record_timeout(
+ self, task: Any, key: Optional[str] = None, timeout: int = 300
+ ) -> Any:
"""
A wrapper around self.push_record which sets a timeout.
@@ -62,49 +69,52 @@ class SandcrawlerWorker(object):
same process.
"""
- def timeout_handler(signum, frame):
+ def timeout_handler(signum: int, frame: Any) -> None:
raise TimeoutError("timeout processing record")
+
signal.signal(signal.SIGALRM, timeout_handler)
resp = None
signal.alarm(int(timeout))
try:
resp = self.push_record(task, key=key)
except TimeoutError:
- self.counts['timeout'] += 1
- resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ self.counts["timeout"] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
# TODO: what if it is this push_record() itself that is timing out?
if resp and self.sink:
self.sink.push_record(resp)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
elif resp:
print(json.dumps(resp))
finally:
signal.alarm(0)
return resp
- def push_batch(self, tasks):
+ def push_batch(self, tasks: List[Any]) -> List[Any]:
results = []
for task in tasks:
results.append(self.push_record(task))
return results
- def finish(self):
+ def finish(self) -> Counter:
if self.sink:
self.sink.finish()
print("Worker: {}".format(self.counts), file=sys.stderr)
return self.counts
- def want(self, task):
+ def want(self, task: Any) -> bool:
"""
Optionally override this as a filter in implementations.
"""
return True
- def process(self, task, key=None):
+ def process(self, task: Any, key: Optional[str] = None) -> Any:
"""
Derived workers need to implement business logic here.
+
+ TODO: should derived workers explicitly type-check the 'task' object?
"""
- raise NotImplementedError('implementation required')
+ raise NotImplementedError("implementation required")
class SandcrawlerFetchWorker(SandcrawlerWorker):
@@ -113,26 +123,26 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
PDFs) from wayback, archive.org, or other sources.
"""
- def __init__(self, wayback_client, **kwargs):
+ def __init__(self, wayback_client: Optional[WaybackClient], **kwargs):
super().__init__(**kwargs)
self.wayback_client = wayback_client
+ self.http_session = requests_retry_session()
- def fetch_blob(self, record):
- start_process = time.time()
- default_key = record['sha1hex']
+ def fetch_blob(self, record: Dict[str, Any]) -> Dict[str, Any]:
+ default_key = record["sha1hex"]
wayback_sec = None
petabox_sec = None
- if record.get('warc_path') and record.get('warc_offset'):
+ if record.get("warc_path") and record.get("warc_offset"):
# it's a full CDX dict. fetch using WaybackClient
if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
+ raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
- blob = self.wayback_client.fetch_petabox_body(
- csize=record['warc_csize'],
- offset=record['warc_offset'],
- warc_path=record['warc_path'],
+ blob: bytes = self.wayback_client.fetch_petabox_body(
+ csize=record["warc_csize"],
+ offset=record["warc_offset"],
+ warc_path=record["warc_path"],
)
wayback_sec = time.time() - start
except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we:
@@ -142,15 +152,15 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
status="error-wayback",
error_msg=str(we),
)
- elif record.get('url') and record.get('datetime'):
+ elif record.get("url") and record.get("datetime"):
# it's a partial CDX dict or something? fetch using WaybackClient
if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
+ raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
blob = self.wayback_client.fetch_replay_body(
- url=record['url'],
- datetime=record['datetime'],
+ url=record["url"],
+ datetime=record["datetime"],
)
wayback_sec = time.time() - start
except (WaybackError, WaybackContentError) as we:
@@ -160,14 +170,15 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
status="error-wayback",
error_msg=str(we),
)
- elif record.get('item') and record.get('path'):
+ elif record.get("item") and record.get("path"):
# it's petabox link; fetch via HTTP
start = time.time()
- resp = requests.get("https://archive.org/serve/{}/{}".format(
- record['item'], record['path']))
+ ia_resp = self.http_session.get(
+ "https://archive.org/serve/{}/{}".format(record["item"], record["path"])
+ )
petabox_sec = time.time() - start
try:
- resp.raise_for_status()
+ ia_resp.raise_for_status()
except Exception as e:
return dict(
key=default_key,
@@ -175,55 +186,67 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
status="error-petabox",
error_msg=str(e),
)
- blob = resp.content
+ blob = ia_resp.content
else:
- raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ raise ValueError(
+ "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed"
+ )
if not blob:
return dict(
key=default_key,
source=record,
status="empty-blob",
+ wayback_sec=wayback_sec,
+ petabox_sec=petabox_sec,
)
return dict(
key=default_key,
status="success",
source=record,
blob=blob,
+ wayback_sec=wayback_sec,
+ petabox_sec=petabox_sec,
)
-class MultiprocessWrapper(SandcrawlerWorker):
- def __init__(self, worker, sink, jobs=None):
+class MultiprocessWrapper(SandcrawlerWorker):
+ def __init__(
+ self,
+ worker: SandcrawlerWorker,
+ sink: Optional[SandcrawlerWorker] = None,
+ jobs: Optional[int] = None,
+ ):
self.counts = Counter()
self.worker = worker
self.sink = sink
self.pool = multiprocessing.pool.Pool(jobs)
- def push_batch(self, tasks):
- self.counts['total'] += len(tasks)
+ def push_batch(self, tasks: List[Any]) -> List[Any]:
+ self.counts["total"] += len(tasks)
print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
results = self.pool.map(self.worker.process, tasks)
for result in results:
if not result:
- self.counts['failed'] += 1
- return
- elif type(result) == dict and 'status' in result and len(result['status']) < 32:
- self.counts[result['status']] += 1
+ self.counts["failed"] += 1
+ return []
+ elif type(result) == dict and "status" in result and len(result["status"]) < 32:
+ self.counts[result["status"]] += 1
if self.sink:
self.sink.push_record(result)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
else:
print(json.dumps(result))
return results
- def finish(self):
+ def finish(self) -> Counter:
self.pool.terminate()
if self.sink:
self.sink.finish()
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
- return worker_counts
+ return self.counts
+
class BlackholeSink(SandcrawlerWorker):
"""
@@ -232,73 +255,73 @@ class BlackholeSink(SandcrawlerWorker):
Useful for tests.
"""
- def push_record(self, task, key=None):
+ def push_record(self, task: Any, key: Optional[str] = None) -> Any:
return
- def push_batch(self, tasks):
- return
+ def push_batch(self, tasks: List[Any]) -> List[Any]:
+ return []
-class KafkaSink(SandcrawlerWorker):
- def __init__(self, kafka_hosts, produce_topic, **kwargs):
+class KafkaSink(SandcrawlerWorker):
+ def __init__(self, kafka_hosts: str, produce_topic: str, **kwargs):
self.sink = None
self.counts = Counter()
self.produce_topic = produce_topic
self.kafka_hosts = kafka_hosts
- config = self.producer_config({
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
- 'api.version.request': True,
- 'api.version.fallback.ms': 0,
- })
+ config = self.producer_config(
+ {
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 30000000, # ~30 MBytes; broker is ~50 MBytes
+ "api.version.request": True,
+ "api.version.fallback.ms": 0,
+ }
+ )
self.producer = Producer(config)
-
@staticmethod
- def _fail_fast(err, msg):
+ def _fail_fast(err: Any, msg: Any) -> None:
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
- def producer_config(self, kafka_config):
+ def producer_config(self, kafka_config: dict) -> dict:
config = kafka_config.copy()
- config.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ config.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "message.timeout.ms": 30000,
+ "request.required.acks": -1, # all brokers must confirm
+ },
}
- })
+ )
return config
- def push_record(self, msg, key=None):
- self.counts['total'] += 1
+ def push_record(self, msg: Any, key: Optional[str] = None) -> Any:
+ self.counts["total"] += 1
if type(msg) == dict:
- if not key and 'key' in msg:
- key = msg['key']
+ if not key and "key" in msg:
+ key = msg["key"]
msg = json.dumps(msg)
if type(msg) == str:
- msg = msg.encode('utf-8')
+ msg = msg.encode("utf-8")
assert type(msg) == bytes
- self.producer.produce(
- self.produce_topic,
- msg,
- key=key,
- on_delivery=self._fail_fast)
- self.counts['produced'] += 1
+ self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast)
+ self.counts["produced"] += 1
# check for errors etc
self.producer.poll(0)
- def push_batch(self, msgs):
+ def push_batch(self, msgs: List[Any]) -> List[Any]:
for m in msgs:
self.push_record(m)
+ return []
- def finish(self):
+ def finish(self) -> Counter:
self.producer.flush()
return self.counts
@@ -308,19 +331,21 @@ class KafkaCompressSink(KafkaSink):
Variant of KafkaSink for large documents. Used for, eg, GROBID output.
"""
- def producer_config(self, kafka_config):
+ def producer_config(self, kafka_config: Dict[str, Any]) -> Dict[str, Any]:
config = kafka_config.copy()
- config.update({
- 'compression.codec': 'gzip',
- 'retry.backoff.ms': 250,
- 'linger.ms': 1000,
- 'batch.num.messages': 50,
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ config.update(
+ {
+ "compression.codec": "gzip",
+ "retry.backoff.ms": 250,
+ "linger.ms": 1000,
+ "batch.num.messages": 50,
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "message.timeout.ms": 30000,
+ "request.required.acks": -1, # all brokers must confirm
+ },
}
- })
+ )
return config
@@ -330,11 +355,11 @@ class RecordPusher:
trivial interface, just wraps an importer and pushes records in to it.
"""
- def __init__(self, worker, **kwargs):
- self.counts = Counter()
- self.worker = worker
+ def __init__(self, worker: SandcrawlerWorker, **kwargs):
+ self.counts: Counter = Counter()
+ self.worker: SandcrawlerWorker = worker
- def run(self):
+ def run(self) -> Counter:
"""
This will look something like:
@@ -347,133 +372,140 @@ class RecordPusher:
class JsonLinePusher(RecordPusher):
-
- def __init__(self, worker, json_file, **kwargs):
+ def __init__(self, worker: SandcrawlerWorker, json_file: Sequence, **kwargs):
self.counts = Counter()
self.worker = worker
self.json_file = json_file
- self.batch_size = kwargs.get('batch_size', None)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
- def run(self):
+ def run(self) -> Counter:
batch = []
for line in self.json_file:
if not line:
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
try:
record = json.loads(line)
except json.decoder.JSONDecodeError:
- self.counts['error-json-decode'] += 1
+ self.counts["error-json-decode"] += 1
continue
if self.batch_size:
batch.append(record)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(record)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
class CdxLinePusher(RecordPusher):
-
- def __init__(self, worker, cdx_file, **kwargs):
+ def __init__(self, worker: SandcrawlerWorker, cdx_file: Sequence, **kwargs):
self.counts = Counter()
self.worker = worker
self.cdx_file = cdx_file
- self.filter_http_statuses = kwargs.get('filter_http_statuses', None)
- self.filter_mimetypes = kwargs.get('filter_mimetypes', None)
- self.allow_octet_stream = kwargs.get('allow_octet_stream', False)
- self.batch_size = kwargs.get('batch_size', None)
+ self.filter_http_statuses = kwargs.get("filter_http_statuses", None)
+ self.filter_mimetypes = kwargs.get("filter_mimetypes", None)
+ self.allow_octet_stream = kwargs.get("allow_octet_stream", False)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
- def run(self):
+ def run(self) -> Counter:
batch = []
for line in self.cdx_file:
if not line:
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
record = parse_cdx_line(line, normalize=True)
if not record:
- self.counts['skip-parse'] += 1
+ self.counts["skip-parse"] += 1
continue
- if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses:
- self.counts['skip-http_status'] += 1
+ if (
+ self.filter_http_statuses
+ and record["http_status"] not in self.filter_http_statuses
+ ):
+ self.counts["skip-http_status"] += 1
continue
- if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
- self.counts['skip-mimetype'] += 1
+ if self.filter_mimetypes and record["mimetype"] not in self.filter_mimetypes:
+ self.counts["skip-mimetype"] += 1
continue
if self.batch_size:
batch.append(record)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(record)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
class ZipfilePusher(RecordPusher):
-
- def __init__(self, worker, zipfile_path, **kwargs):
+ def __init__(self, worker: SandcrawlerWorker, zipfile_path: str, **kwargs):
self.counts = Counter()
self.worker = worker
self.filter_suffix = ".pdf"
self.zipfile_path = zipfile_path
- self.batch_size = kwargs.get('batch_size', None)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
- def run(self):
+ def run(self) -> Counter:
batch = []
- with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
+ with zipfile.ZipFile(self.zipfile_path, "r") as archive:
for zipinfo in archive.infolist():
if not zipinfo.filename.endswith(self.filter_suffix):
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
# NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it
- flo = archive.open(zipinfo, 'r')
+ flo = archive.open(zipinfo, "r")
data = flo.read(2**32)
flo.close()
if self.batch_size:
batch.append(data)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(data)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-class KafkaJsonPusher(RecordPusher):
- def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
+class KafkaJsonPusher(RecordPusher):
+ def __init__(
+ self,
+ worker: SandcrawlerWorker,
+ kafka_hosts: str,
+ consume_topic: str,
+ group: str,
+ **kwargs
+ ):
self.counts = Counter()
self.worker = worker
self.consumer = make_kafka_consumer(
@@ -481,29 +513,32 @@ class KafkaJsonPusher(RecordPusher):
consume_topic,
group,
)
- self.push_batches = kwargs.get('push_batches', False)
- self.raw_records = kwargs.get('raw_records', False)
- self.poll_interval = kwargs.get('poll_interval', 5.0)
- self.batch_size = kwargs.get('batch_size', 100)
+ self.push_batches = kwargs.get("push_batches", False)
+ self.raw_records = kwargs.get("raw_records", False)
+ self.poll_interval = kwargs.get("poll_interval", 5.0)
+ self.batch_size = kwargs.get("batch_size", 100)
if self.batch_size in (0, 1):
self.batch_size = 1
- self.batch_worker = kwargs.get('batch_worker', False)
- self.process_timeout_sec = kwargs.get('process_timeout_sec', 300)
+ self.batch_worker = kwargs.get("batch_worker", False)
+ self.process_timeout_sec = kwargs.get("process_timeout_sec", 300)
- def run(self):
+ def run(self) -> Counter:
while True:
# TODO: this is batch-oriented, because underlying worker is
# often batch-oriented, but this doesn't confirm that entire batch
- # has been pushed to fatcat before commiting offset. Eg, consider
+ # has been pushed to fatcat before committing offset. Eg, consider
# case where there there is one update and thousands of creates;
# update would be lingering in worker, and if worker crashed
# never created. Not great.
batch = self.consumer.consume(
- num_messages=self.batch_size,
- timeout=self.poll_interval)
- print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval),
- file=sys.stderr)
+ num_messages=self.batch_size, timeout=self.poll_interval
+ )
+ print(
+ "... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval
+ ),
+ file=sys.stderr,
+ )
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -515,14 +550,14 @@ class KafkaJsonPusher(RecordPusher):
raise KafkaException(msg.error())
# ... then process
if self.push_batches:
- self.counts['total'] += len(batch)
- records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
+ self.counts["total"] += len(batch)
+ records = [json.loads(msg.value().decode("utf-8")) for msg in batch]
self.worker.push_batch(records)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
else:
for msg in batch:
- self.counts['total'] += 1
+ self.counts["total"] += 1
if self.raw_records:
# In this mode, pass the Kafka message as bytes through
# without decoding as JSON. Eg, for thumbnails (where
@@ -530,7 +565,7 @@ class KafkaJsonPusher(RecordPusher):
# from the message)
record = msg.value()
else:
- record = json.loads(msg.value().decode('utf-8'))
+ record = json.loads(msg.value().decode("utf-8"))
# This complex bit of code implements backoff/backpressure
# in a way that will not cause this Kafka consumer to lose
# partition assignments (resulting in a rebalance). This
@@ -540,7 +575,9 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec)
+ self.worker.push_record_timeout(
+ record, key=msg.key(), timeout=self.process_timeout_sec
+ )
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
@@ -552,8 +589,8 @@ class KafkaJsonPusher(RecordPusher):
assert not empty_batch
time.sleep(5)
self.consumer.resume(self.consumer.assignment())
- self.counts['pushed'] += 1
- if self.counts['total'] % 500 == 0:
+ self.counts["pushed"] += 1
+ if self.counts["total"] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
for msg in batch:
# locally store offsets of processed messages; will be
@@ -562,16 +599,16 @@ class KafkaJsonPusher(RecordPusher):
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)
self.consumer.close()
return self.counts
-def make_kafka_consumer(hosts, consume_topic, group):
+def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer:
topic_name = consume_topic
- def fail_fast(err, partitions):
+ def fail_fast(err: Any, partitions: List[Any]) -> None:
if err is not None:
print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
print("Bailing out...", file=sys.stderr)
@@ -584,40 +621,41 @@ def make_kafka_consumer(hosts, consume_topic, group):
print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(p.error)
- #print("Kafka consumer commit successful")
+ # print("Kafka consumer commit successful")
pass
# previously, using pykafka
- #auto_commit_enable=True,
- #auto_commit_interval_ms=30000, # 30 seconds
+ # auto_commit_enable=True,
+ # auto_commit_interval_ms=30000, # 30 seconds
conf = {
- 'bootstrap.servers': hosts,
- 'group.id': group,
- 'on_commit': fail_fast,
+ "bootstrap.servers": hosts,
+ "group.id": group,
+ "on_commit": fail_fast,
# messages don't have offset marked as stored until processed,
# but we do auto-commit stored offsets to broker
- 'enable.auto.offset.store': False,
- 'enable.auto.commit': True,
+ "enable.auto.offset.store": False,
+ "enable.auto.commit": True,
# user code timeout; if no poll after this long, assume user code
# hung and rebalance (default: 6min)
- 'max.poll.interval.ms': 360000,
- 'default.topic.config': {
- 'auto.offset.reset': 'latest',
+ "max.poll.interval.ms": 360000,
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
},
}
- def on_rebalance(consumer, partitions):
+ def on_rebalance(consumer: Any, partitions: List[Any]) -> None:
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions),
- file=sys.stderr)
+ print(
+ "Kafka partitions rebalanced: {} / {}".format(consumer, partitions), file=sys.stderr
+ )
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8
# encoded)
- consumer.subscribe([topic_name],
+ consumer.subscribe(
+ [topic_name],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)