summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py10
1 files changed, 6 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 15d7aae3..e58b3da1 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -19,7 +19,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
- batch_size=200):
+ batch_size=200, api_host="https://api.fatcat.wiki/v0"):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
@@ -30,9 +30,11 @@ class ElasticsearchReleaseWorker(FatcatWorker):
self.entity_type = ReleaseEntity
self.elasticsearch_document_name = "release"
self.transform_func = release_to_elasticsearch
+ self.api_host = api_host
def run(self):
ac = ApiClient()
+ api = public_api(self.api_host)
def fail_fast(err, partitions):
if err is not None:
@@ -103,11 +105,11 @@ class ElasticsearchReleaseWorker(FatcatWorker):
if entity_dict.get('name') and not entity_dict.get('title'):
continue
entity = entity_from_json(json_str, self.entity_type, api_client=ac)
- if self.entity_type == "changelog":
+ if self.elasticsearch_document_name == "changelog":
key = entity.index
# might need to fetch from API
if not (entity.editgroup and entity.editgroup.editor):
- entity = ac.get_changelog_entry(entity.index, expand="editgroup,editor")
+ entity = api.get_changelog_entry(entity.index)
else:
key = entity.ident
# TODO: handle deletions from index
@@ -116,7 +118,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
}))
bulk_actions.append(json.dumps(
self.transform_func(entity)))
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(entity.ident, len(batch), self.entity_type))
+ print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.elasticsearch_document_name))
elasticsearch_endpoint = "{}/{}/{}/_bulk".format(
self.elasticsearch_backend,
self.elasticsearch_index,