diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/workers/changelog.py | 8 | ||||
| -rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 8 | ||||
| -rwxr-xr-x | python/fatcat_worker.py | 10 | 
3 files changed, 12 insertions, 14 deletions
| diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index b6d99d06..8690a791 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -12,11 +12,11 @@ class ChangelogWorker(FatcatWorker):      found, fetch them and push (as JSON) into a Kafka topic.      """ -    def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): +    def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):          # TODO: should be offset=0          super().__init__(kafka_hosts=kafka_hosts,                           produce_topic=produce_topic, -                         api_host_url=api_host_url) +                         api=api)          self.poll_interval = poll_interval          self.offset = offset    # the fatcat changelog offset, not the kafka offset @@ -61,10 +61,10 @@ class EntityUpdatesWorker(FatcatWorker):      For now, only release updates are published.      """ -    def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic): +    def __init__(self, api, kafka_hosts, consume_topic, release_topic):          super().__init__(kafka_hosts=kafka_hosts,                           consume_topic=consume_topic, -                         api_host_url=api_host_url) +                         api=api)          self.release_topic = release_topic          self.consumer_group = "entity-updates" diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index e400e815..b84341c7 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -45,11 +45,9 @@ class FatcatWorker:      Common code for for Kafka producers and consumers.      """ -    def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None): -        if api_host_url: -            conf = fatcat_client.Configuration() -            conf.host = api_host_url -            self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) +    def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api=None): +        if api: +            self.api = api          self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")          self.produce_topic = produce_topic          self.consume_topic = consume_topic diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 3c4cacc1..0207fb19 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -14,20 +14,20 @@ sentry_client = raven.Client()  def run_changelog(args):      topic = "fatcat-{}.changelog".format(args.env)      worker = ChangelogWorker(args.api, args.kafka_hosts, topic, -        args.poll_interval) +        poll_interval=args.poll_interval)      worker.run()  def run_entity_updates(args):      changelog_topic = "fatcat-{}.changelog".format(args.env)      release_topic = "fatcat-{}.release-updates".format(args.env) -    worker = EntityUpdatesWorker(args.api, args.kafka_hosts, -        changelog_topic, release_topic) +    worker = EntityUpdatesWorker(args.api, args.kafka_hosts, changelog_topic, +        release_topic=release_topic)      worker.run()  def run_elasticsearch_release(args):      consume_topic = "fatcat-{}.release-updates".format(args.env) -    worker = ElasticsearchReleaseWorker(args.kafka_hosts, -        consume_topic, elasticsearch_backend=args.elasticsearch_backend, +    worker = ElasticsearchReleaseWorker(args.kafka_hosts, consume_topic, +        elasticsearch_backend=args.elasticsearch_backend,          elasticsearch_index=args.elasticsearch_index)      worker.run() | 
