aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py8
-rwxr-xr-xpython/fatcat_worker.py2
2 files changed, 5 insertions, 5 deletions
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index 0ec40eef..83310284 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -17,17 +17,17 @@ class ElasticsearchReleaseWorker(FatcatWorker):
Uses a consumer group to manage offset.
"""
- def __init__(self, api, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat"):
super().__init__(kafka_hosts=kafka_hosts,
- consume_topic=consume_topic,
- api=api)
+ consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates"
self.elasticsearch_backend = elasticsearch_backend
self.elasticsearch_index = elasticsearch_index
def run(self):
consume_topic = self.kafka.topics[self.consume_topic]
+ ac = ApiClient()
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
@@ -40,7 +40,7 @@ class ElasticsearchReleaseWorker(FatcatWorker):
for msg in consumer:
json_str = msg.value.decode('utf-8')
- release = entity_from_json(json_str, ReleaseEntity, api_client=self.api)
+ release = entity_from_json(json_str, ReleaseEntity, api_client=ac)
#print(release)
elasticsearch_endpoint = "{}/{}/release/{}".format(
self.elasticsearch_backend,
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index cc8d05e8..d9d21c6d 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -27,7 +27,7 @@ def run_entity_updates(args):
def run_elasticsearch_release(args):
consume_topic = "fatcat-{}.release-updates".format(args.env)
- worker = ElasticsearchReleaseWorker(args.api, args.kafka_hosts, consume_topic,
+ worker = ElasticsearchReleaseWorker(args.kafka_hosts, consume_topic,
elasticsearch_backend=args.elasticsearch_backend,
elasticsearch_index=args.elasticsearch_index)
worker.run()