diff options
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 18 |
1 files changed, 4 insertions, 14 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 61854c31..4850bb0a 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -27,7 +27,6 @@ class ElasticsearchReleaseWorker(FatcatWorker): self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index self.entity_type = ReleaseEntity - self.elasticsearch_document_name = "release" self.transform_func = release_to_elasticsearch self.api_host = api_host @@ -97,14 +96,8 @@ class ElasticsearchReleaseWorker(FatcatWorker): bulk_actions = [] for msg in batch: json_str = msg.value().decode('utf-8') - # HACK: work around a bug where container entities got published to - # release_v03 topic - if self.elasticsearch_document_name == "release": - entity_dict = json.loads(json_str) - if entity_dict.get('name') and not entity_dict.get('title'): - continue entity = entity_from_json(json_str, self.entity_type, api_client=ac) - if self.elasticsearch_document_name == "changelog": + if self.entity_type == ChangelogEntry: key = entity.index # might need to fetch from API if not (entity.editgroup and entity.editgroup.editor): @@ -117,11 +110,10 @@ class ElasticsearchReleaseWorker(FatcatWorker): })) bulk_actions.append(json.dumps( self.transform_func(entity))) - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.elasticsearch_document_name)) - elasticsearch_endpoint = "{}/{}/{}/_bulk".format( + print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) + elasticsearch_endpoint = "{}/{}/_bulk".format( self.elasticsearch_backend, - self.elasticsearch_index, - self.elasticsearch_document_name) + self.elasticsearch_index) resp = requests.post(elasticsearch_endpoint, headers={"Content-Type": "application/x-ndjson"}, data="\n".join(bulk_actions) + "\n") @@ -152,7 +144,6 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" self.entity_type = ContainerEntity - self.elasticsearch_document_name = "container" self.transform_func = container_to_elasticsearch @@ -174,5 +165,4 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): self.elasticsearch_backend = elasticsearch_backend self.elasticsearch_index = elasticsearch_index self.entity_type = ChangelogEntry - self.elasticsearch_document_name = "changelog" self.transform_func = changelog_to_elasticsearch |