summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-03-04 18:53:01 -0800
committerBryan Newbold <bnewbold@robocracy.org>2019-03-04 18:53:03 -0800
commit30a8fc5f78eb15f641d6e8da0e7c0741d3f82b13 (patch)
tree1f53b6e78875c36bcac8180a0aa5d211da824916 /python/fatcat_tools/workers
parent0ad7f9385bd9b459a4f6b23dbdfcaeacc58ebb69 (diff)
downloadfatcat-30a8fc5f78eb15f641d6e8da0e7c0741d3f82b13.tar.gz
fatcat-30a8fc5f78eb15f641d6e8da0e7c0741d3f82b13.zip
elastic-release worker w/o API
Forgot that this worker really doesn't want/need any API connection at all; just an ApiClient to deserialize objects from Kafka.
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 0ec40eef..83310284 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -17,17 +17,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):
Uses a consumer group to manage offset.
"""
- def __init__(self, api, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"):
super().__init__(kafka_hosts=kafka_hosts,
- consume_topic=consume_topic,
- api=api)
+ consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates"
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
def run(self):
consume_topic = self.kafka.topics[self.consume_topic]
+ ac = ApiClient()
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
@@ -40,7 +40,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
for msg in consumer:
json_str = msg.value.decode('utf-8')
- release = entity_from_json(json_str, ReleaseEntity, api_client=self.api)
+ release = entity_from_json(json_str, ReleaseEntity, api_client=ac)
#print(release)
elasticsearch_endpoint = "{}/{}/release/{}".format(
self.elasticsearch_backend,