From 026e352f5d99652f088b6bcdc28d43106b8f52d2 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 17 Apr 2020 15:32:18 -0700 Subject: ES changelog worker: fixes for ident; fetch update from API if needed The API fetch update may be needed for old changelog entries in the kafka feed. --- python/fatcat_tools/workers/elasticsearch.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'python/fatcat_tools/workers/elasticsearch.py') diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 525f372b..15d7aae3 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -103,13 +103,20 @@ class ElasticsearchReleaseWorker(FatcatWorker): if entity_dict.get('name') and not entity_dict.get('title'): continue entity = entity_from_json(json_str, self.entity_type, api_client=ac) + if self.entity_type == "changelog": + key = entity.index + # might need to fetch from API + if not (entity.editgroup and entity.editgroup.editor): + entity = ac.get_changelog_entry(entity.index, expand="editgroup,editor") + else: + key = entity.ident # TODO: handle deletions from index bulk_actions.append(json.dumps({ - "index": { "_id": entity.ident, }, + "index": { "_id": key, }, })) bulk_actions.append(json.dumps( self.transform_func(entity))) - print("Upserting, eg, {} (of {} releases in elasticsearch)".format(entity.ident, len(batch))) + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(entity.ident, len(batch), self.entity_type)) elasticsearch_endpoint = "{}/{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index, -- cgit v1.2.3 From 9e9f7f1da115458d87f8bcdc011b2843e2b31d3b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 17 Apr 2020 16:29:40 -0700 Subject: more changelog ES fixes --- python/fatcat_tools/workers/elasticsearch.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'python/fatcat_tools/workers/elasticsearch.py') diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 15d7aae3..e58b3da1 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -19,7 +19,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200): + batch_size=200, api_host="https://api.fatcat.wiki/v0"): super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" @@ -30,9 +30,11 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.entity_type = ReleaseEntity self.elasticsearch_document_name = "release" self.transform_func = release_to_elasticsearch + self.api_host = api_host def run(self): ac = ApiClient() + api = public_api(self.api_host) def fail_fast(err, partitions): if err is not None: @@ -103,11 +105,11 @@ class ElasticsearchReleaseWorker(FatcatWorker): if entity_dict.get('name') and not entity_dict.get('title'): continue entity = entity_from_json(json_str, self.entity_type, api_client=ac) - if self.entity_type == "changelog": + if self.elasticsearch_document_name == "changelog": key = entity.index # might need to fetch from API if not (entity.editgroup and entity.editgroup.editor): - entity = ac.get_changelog_entry(entity.index, expand="editgroup,editor") + entity = api.get_changelog_entry(entity.index) else: key = entity.ident # TODO: handle deletions from index @@ -116,7 +118,7 @@ class ElasticsearchReleaseWorker(FatcatWorker): })) bulk_actions.append(json.dumps( self.transform_func(entity))) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(entity.ident, len(batch), self.entity_type)) + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.elasticsearch_document_name)) elasticsearch_endpoint = "{}/{}/{}/_bulk".format( self.elasticsearch_backend, self.elasticsearch_index, -- cgit v1.2.3