diff options
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 18 | 
1 files changed, 4 insertions, 14 deletions
| diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 61854c31..4850bb0a 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -27,7 +27,6 @@ class ElasticsearchReleaseWorker(FatcatWorker):          self.elasticsearch_backend = elasticsearch_backend          self.elasticsearch_index = elasticsearch_index          self.entity_type = ReleaseEntity -        self.elasticsearch_document_name = "release"          self.transform_func = release_to_elasticsearch          self.api_host = api_host @@ -97,14 +96,8 @@ class ElasticsearchReleaseWorker(FatcatWorker):              bulk_actions = []              for msg in batch:                  json_str = msg.value().decode('utf-8') -                # HACK: work around a bug where container entities got published to -                # release_v03 topic -                if self.elasticsearch_document_name == "release": -                    entity_dict = json.loads(json_str) -                    if entity_dict.get('name') and not entity_dict.get('title'): -                        continue                  entity = entity_from_json(json_str, self.entity_type, api_client=ac) -                if self.elasticsearch_document_name == "changelog": +                if self.entity_type == ChangelogEntry:                      key = entity.index                      # might need to fetch from API                      if not (entity.editgroup and entity.editgroup.editor): @@ -117,11 +110,10 @@ class ElasticsearchReleaseWorker(FatcatWorker):                  }))                  bulk_actions.append(json.dumps(                      self.transform_func(entity))) -            print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.elasticsearch_document_name)) -            elasticsearch_endpoint = "{}/{}/{}/_bulk".format( +            print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type)) +            elasticsearch_endpoint = "{}/{}/_bulk".format(                  self.elasticsearch_backend, -                self.elasticsearch_index, -                self.elasticsearch_document_name) +                self.elasticsearch_index)              resp = requests.post(elasticsearch_endpoint,                  headers={"Content-Type": "application/x-ndjson"},                  data="\n".join(bulk_actions) + "\n") @@ -152,7 +144,6 @@ class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):          # previous group got corrupted (by pykafka library?)          self.consumer_group = "elasticsearch-updates3"          self.entity_type = ContainerEntity -        self.elasticsearch_document_name = "container"          self.transform_func = container_to_elasticsearch @@ -174,5 +165,4 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):          self.elasticsearch_backend = elasticsearch_backend          self.elasticsearch_index = elasticsearch_index          self.entity_type = ChangelogEntry -        self.elasticsearch_document_name = "changelog"          self.transform_func = changelog_to_elasticsearch | 
