summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/elasticsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/elasticsearch.py')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py7
1 files changed, 3 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 0a49192e..0ec40eef 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -17,18 +17,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):
Uses a consumer group to manage offset.
"""
- def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ def __init__(self, api, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
- api_host_url=None)
+ api=api)
self.consumer_group = "elasticsearch-updates"
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
def run(self):
consume_topic = self.kafka.topics[self.consume_topic]
- ac = ApiClient()
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
@@ -41,7 +40,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
for msg in consumer:
json_str = msg.value.decode('utf-8')
- release = entity_from_json(json_str, ReleaseEntity, api_client=ac)
+ release = entity_from_json(json_str, ReleaseEntity, api_client=self.api)
#print(release)
elasticsearch_endpoint = "{}/{}/release/{}".format(
self.elasticsearch_backend,