aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/fatcat_tools/workers/changelog.py8
-rw-r--r--python/fatcat_tools/workers/worker_common.py8
-rwxr-xr-xpython/fatcat_worker.py10
3 files changed, 12 insertions, 14 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index b6d99d06..8690a791 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -12,11 +12,11 @@ class ChangelogWorker(FatcatWorker):
found, fetch them and push (as JSON) into a Kafka topic.
"""
- def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
+ def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
# TODO: should be offset=0
super().__init__(kafka_hosts=kafka_hosts,
produce_topic=produce_topic,
- api_host_url=api_host_url)
+ api=api)
self.poll_interval = poll_interval
self.offset = offset # the fatcat changelog offset, not the kafka offset
@@ -61,10 +61,10 @@ class EntityUpdatesWorker(FatcatWorker):
For now, only release updates are published.
"""
- def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic):
+ def __init__(self, api, kafka_hosts, consume_topic, release_topic):
super().__init__(kafka_hosts=kafka_hosts,
consume_topic=consume_topic,
- api_host_url=api_host_url)
+ api=api)
self.release_topic = release_topic
self.consumer_group = "entity-updates"
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index e400e815..b84341c7 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -45,11 +45,9 @@ class FatcatWorker:
Common code for for Kafka producers and consumers.
"""
- def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None):
- if api_host_url:
- conf = fatcat_client.Configuration()
- conf.host = api_host_url
- self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf))
+ def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None):
+ if api:
+ self.api = api
self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
self.produce_topic = produce_topic
self.consume_topic = consume_topic
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 3c4cacc1..0207fb19 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -14,20 +14,20 @@ sentry_client = raven.Client()
def run_changelog(args):
topic = "fatcat-{}.changelog".format(args.env)
worker = ChangelogWorker(args.api, args.kafka_hosts, topic,
- args.poll_interval)
+ poll_interval=args.poll_interval)
worker.run()
def run_entity_updates(args):
changelog_topic = "fatcat-{}.changelog".format(args.env)
release_topic = "fatcat-{}.release-updates".format(args.env)
- worker = EntityUpdatesWorker(args.api, args.kafka_hosts,
- changelog_topic, release_topic)
+ worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic,
+ release_topic=release_topic)
worker.run()
def run_elasticsearch_release(args):
consume_topic = "fatcat-{}.release-updates".format(args.env)
- worker = ElasticsearchReleaseWorker(args.kafka_hosts,
- consume_topic, elasticsearch_backend=args.elasticsearch_backend,
+ worker = ElasticsearchReleaseWorker(args.kafka_hosts, consume_topic,
+ elasticsearch_backend=args.elasticsearch_backend,
elasticsearch_index=args.elasticsearch_index)
worker.run()