summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-09-19 17:11:47 -0700
committerBryan Newbold <bnewbold@robocracy.org>2019-09-20 11:21:11 -0700
commit80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769 (patch)
tree61e14bd3a21bd3dbf70a8a54c488cf09c7c2f11f /python/fatcat_tools/workers/elasticsearch.py
parent6183e95e9739a6fbf0d8cd77603d075e87804abb (diff)
downloadfatcat-80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769.tar.gz
fatcat-80dc9bab9c6e40cdde95f9e9c7fad13ca64b0769.zip
small fixes to confluent-kafka importers/workers
- decrease default changelog pipeline to 5.0sec - fix missing KafkaException harvester imports - more confluent-kafka tweaks - updates to kafka consumer configs - bump elastic updates consumergroup (again)
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py33
1 files changed, 30 insertions, 3 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 547e270c..acb705c2 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -22,7 +22,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
- self.consumer_group = "elasticsearch-updates"
+ self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
@@ -34,6 +34,22 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def run(self):
ac = ApiClient()
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ #print("Kafka consumer commit successful")
+ pass
+
def on_rebalance(consumer, partitions):
for p in partitions:
if p.error:
@@ -44,7 +60,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
'group.id': self.consumer_group,
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.commit': True,
'enable.auto.offset.store': False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 60000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
@@ -92,7 +116,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):
print(desc)
print(resp.content)
raise Exception(desc)
- consumer.store_offsets(batch[-1])
+ for msg in batch:
+ # offsets are *committed* (to brokers) automatically, but need
+ # to be marked as processed here
+ consumer.store_offsets(message=msg)
@@ -109,7 +136,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
elasticsearch_index=elasticsearch_index,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
- self.consumer_group = "elasticsearch-updates2"
+ self.consumer_group = "elasticsearch-updates3"
self.entity_type = ContainerEntity
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch