aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-09-19 17:11:47 -0700
committerBryan Newbold <bnewbold@robocracy.org>2019-09-20 11:21:11 -0700
commit80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769 (patch)
tree61e14bd3a21bd3dbf70a8a54c488cf09c7c2f11f /python/fatcat_tools/workers
parent6183e95e9739a6fbf0d8cd77603d075e87804abb (diff)
downloadfatcat-80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769.tar.gz
fatcat-80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769.zip
small fixes to confluent-kafka importers/workers
- decrease default changelog pipeline to 5.0sec - fix missing KafkaException harvester imports - more confluent-kafka tweaks - updates to kafka consumer configs - bump elastic updates consumergroup (again)
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/changelog.py14
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py33
-rw-r--r--python/fatcat_tools/workers/worker_common.py6
3 files changed, 41 insertions, 12 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index 4a54c649..c134bde2 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -13,7 +13,6 @@ class ChangelogWorker(FatcatWorker):
"""
def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
- # TODO: should be offset=0
super().__init__(kafka_hosts=kafka_hosts,
produce_topic=produce_topic,
api=api)
@@ -118,7 +117,15 @@ class EntityUpdatesWorker(FatcatWorker):
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
'group.id': self.consumer_group,
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.commit': True,
'enable.auto.offset.store': False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 180000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
@@ -142,8 +149,7 @@ class EntityUpdatesWorker(FatcatWorker):
while True:
msg = consumer.poll(self.poll_interval)
if not msg:
- print("nothing new from kafka (interval:{})".format(self.poll_interval))
- consumer.commit()
+ print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval))
continue
if msg.error():
raise KafkaException(msg.error())
@@ -202,5 +208,5 @@ class EntityUpdatesWorker(FatcatWorker):
on_delivery=fail_fast,
)
# TODO: actually update works
- consumer.store_offsets(msg)
+ consumer.store_offsets(message=msg)
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 547e270c..acb705c2 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -22,7 +22,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
- self.consumer_group = "elasticsearch-updates"
+ self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
@@ -34,6 +34,22 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def run(self):
ac = ApiClient()
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ #print("Kafka consumer commit successful")
+ pass
+
def on_rebalance(consumer, partitions):
for p in partitions:
if p.error:
@@ -44,7 +60,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
'group.id': self.consumer_group,
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.commit': True,
'enable.auto.offset.store': False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 60000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
@@ -92,7 +116,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):
print(desc)
print(resp.content)
raise Exception(desc)
- consumer.store_offsets(batch[-1])
+ for msg in batch:
+ # offsets are *committed* (to brokers) automatically, but need
+ # to be marked as processed here
+ consumer.store_offsets(message=msg)
@@ -109,7 +136,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
elasticsearch_index=elasticsearch_index,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
- self.consumer_group = "elasticsearch-updates2"
+ self.consumer_group = "elasticsearch-updates3"
self.entity_type = ContainerEntity
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index fb8cfc19..ef79f528 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -66,11 +66,7 @@ class FatcatWorker:
self.api = api
self.kafka_config = {
'bootstrap.servers': kafka_hosts,
- 'delivery.report.only.error': True,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
- 'default.topic.config': {
- 'request.required.acks': 'all',
- },
+ 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
}
self.produce_topic = produce_topic
self.consume_topic = consume_topic