aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-03-04 18:11:58 -0800
committerBryan Newbold <bnewbold@robocracy.org>2019-03-04 18:11:58 -0800
commit223d7847157ade4592ceaa2e88b2ccc2288a5da4 (patch)
tree1a7a5c3077cc2b6d7a78e5d0df3aa37bcad72050
parent7e4ca8f03e8bc07ca1811016a61005c78863e2f7 (diff)
downloadfatcat-223d7847157ade4592ceaa2e88b2ccc2288a5da4.tar.gz
fatcat-223d7847157ade4592ceaa2e88b2ccc2288a5da4.zip
fix elastic research worker api arg
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py7
1 files changed, 3 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 0a49192e..0ec40eef 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -17,18 +17,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):
Uses a consumer group to manage offset.
"""
- def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ def __init__(self, api, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
- api_host_url=None)
+ api=api)
self.consumer_group = "elasticsearch-updates"
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
def run(self):
consume_topic = self.kafka.topics[self.consume_topic]
- ac = ApiClient()
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
@@ -41,7 +40,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
for msg in consumer:
json_str = msg.value.decode('utf-8')
- release = entity_from_json(json_str, ReleaseEntity, api_client=ac)
+ release = entity_from_json(json_str, ReleaseEntity, api_client=self.api)
#print(release)
elasticsearch_endpoint = "{}/{}/release/{}".format(
self.elasticsearch_backend,