diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-04 17:26:28 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-04 17:26:28 -0800 |
commit | 881b46e3b1682974f48fc196f483c3fa2648b998 (patch) | |
tree | ba73e463482b8a125d92ef1942cbaa1f19f535f6 /python/fatcat_worker.py | |
parent | 87b8f1c710c7c94ed9c1b040f75c1601591a214b (diff) | |
download | fatcat-881b46e3b1682974f48fc196f483c3fa2648b998.tar.gz fatcat-881b46e3b1682974f48fc196f483c3fa2648b998.zip |
first-draft kafka workers (changelog, release_update)
Diffstat (limited to 'python/fatcat_worker.py')
-rwxr-xr-x | python/fatcat_worker.py | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py new file mode 100755 index 00000000..cc11beca --- /dev/null +++ b/python/fatcat_worker.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +import sys +import argparse +from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker + +def run_changelog_worker(args): + topic = "fatcat-{}.changelog".format(args.env) + worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic, + args.poll_interval) + worker.run() + +def run_entity_updates_worker(args): + changelog_topic = "fatcat-{}.changelog".format(args.env) + release_topic = "fatcat-{}.release-updates".format(args.env) + worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts, + changelog_topic, release_topic) + worker.run() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--debug', + action='store_true', + help="enable debug logging") + parser.add_argument('--api-host-url', + default="http://localhost:9411/v0", + help="fatcat API host/port to use") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--env', + default="qa", + help="Kafka topic namespace to use (eg, prod, qa)") + subparsers = parser.add_subparsers() + + sub_changelog = subparsers.add_parser('changelog') + sub_changelog.set_defaults(func=run_changelog_worker) + sub_changelog.add_argument('--poll-interval', + help="how long to wait between polling (seconds)", + default=10.0, type=float) + + sub_entity_updates = subparsers.add_parser('entity-updates') + sub_entity_updates.set_defaults(func=run_entity_updates_worker) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + args.func(args) + +if __name__ == '__main__': + main() |