aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py33
1 files changed, 30 insertions, 3 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 547e270c..acb705c2 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -22,7 +22,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
batch_size=200):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
- self.consumer_group = "elasticsearch-updates"
+ self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
self.poll_interval = poll_interval
self.elasticsearch_backend = elasticsearch_backend
@@ -34,6 +34,22 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def run(self):
ac = ApiClient()
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ #print("Kafka consumer commit successful")
+ pass
+
def on_rebalance(consumer, partitions):
for p in partitions:
if p.error:
@@ -44,7 +60,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
consumer_conf = self.kafka_config.copy()
consumer_conf.update({
'group.id': self.consumer_group,
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.commit': True,
'enable.auto.offset.store': False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 60000,
'default.topic.config': {
'auto.offset.reset': 'latest',
},
@@ -92,7 +116,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):
print(desc)
print(resp.content)
raise Exception(desc)
- consumer.store_offsets(batch[-1])
+ for msg in batch:
+ # offsets are *committed* (to brokers) automatically, but need
+ # to be marked as processed here
+ consumer.store_offsets(message=msg)
@@ -109,7 +136,7 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
elasticsearch_index=elasticsearch_index,
batch_size=batch_size)
# previous group got corrupted (by pykafka library?)
- self.consumer_group = "elasticsearch-updates2"
+ self.consumer_group = "elasticsearch-updates3"
self.entity_type = ContainerEntity
self.elasticsearch_document_name = "container"
self.transform_func = container_to_elasticsearch