diff options
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index acb705c2..2ba241eb 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -2,7 +2,7 @@ import json import time import requests -from confluent_kafka import Consumer, Producer, KafkaException +from confluent_kafka import Consumer, KafkaException from fatcat_openapi_client import ReleaseEntity, ContainerEntity, ApiClient from fatcat_tools import * @@ -61,7 +61,6 @@ class ElasticsearchReleaseWorker(FatcatWorker): consumer_conf.update({ 'group.id': self.consumer_group, 'on_commit': fail_fast, - 'delivery.report.only.error': True, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker 'enable.auto.commit': True, @@ -97,16 +96,24 @@ class ElasticsearchReleaseWorker(FatcatWorker): bulk_actions = [] for msg in batch: json_str = msg.value().decode('utf-8') - entity = entity_from_json(json_str, ReleaseEntity, api_client=ac) - print("Upserting: release/{}".format(entity.ident)) + # HACK: work around a bug where container entities got published to + # release_v03 topic + if self.elasticsearch_document_name == "release": + entity_dict = json.loads(json_str) + if entity_dict.get('name') and not entity_dict.get('title'): + continue + entity = entity_from_json(json_str, self.entity_type, api_client=ac) + # TODO: handle deletions from index bulk_actions.append(json.dumps({ "index": { "_id": entity.ident, }, })) bulk_actions.append(json.dumps( - release_to_elasticsearch(entity))) - elasticsearch_endpoint = "{}/{}/release/_bulk".format( + self.transform_func(entity))) + print("Upserting, eg, {} (of {} releases in elasticsearch)".format(entity.ident, len(batch))) + elasticsearch_endpoint = "{}/{}/{}/_bulk".format( self.elasticsearch_backend, - self.elasticsearch_index) + self.elasticsearch_index, + self.elasticsearch_document_name) resp = requests.post(elasticsearch_endpoint, headers={"Content-Type": "application/x-ndjson"}, data="\n".join(bulk_actions) + "\n") @@ -141,4 +148,3 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch - |