diff options
Diffstat (limited to 'python/fatcat_tools/workers')
| -rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 10 | 
1 files changed, 6 insertions, 4 deletions
| diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index 15d7aae3..e58b3da1 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -19,7 +19,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):      def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,              elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", -            batch_size=200): +            batch_size=200, api_host="https://api.fatcat.wiki/v0"):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic)          self.consumer_group = "elasticsearch-updates3" @@ -30,9 +30,11 @@ class ElasticsearchReleaseWorker(FatcatWorker):          self.entity_type = ReleaseEntity          self.elasticsearch_document_name = "release"          self.transform_func = release_to_elasticsearch +        self.api_host = api_host      def run(self):          ac = ApiClient() +        api = public_api(self.api_host)          def fail_fast(err, partitions):              if err is not None: @@ -103,11 +105,11 @@ class ElasticsearchReleaseWorker(FatcatWorker):                      if entity_dict.get('name') and not entity_dict.get('title'):                          continue                  entity = entity_from_json(json_str, self.entity_type, api_client=ac) -                if self.entity_type == "changelog": +                if self.elasticsearch_document_name == "changelog":                      key = entity.index                      # might need to fetch from API                      if not (entity.editgroup and entity.editgroup.editor): -                        entity = ac.get_changelog_entry(entity.index, expand="editgroup,editor") +                        entity = api.get_changelog_entry(entity.index)                  else:                      key = entity.ident                  # TODO: handle deletions from index @@ -116,7 +118,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):                  }))                  bulk_actions.append(json.dumps(                      self.transform_func(entity))) -            print("Upserting, eg, {} (of {} {} in elasticsearch)".format(entity.ident, len(batch), self.entity_type)) +            print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.elasticsearch_document_name))              elasticsearch_endpoint = "{}/{}/{}/_bulk".format(                  self.elasticsearch_backend,                  self.elasticsearch_index, | 
