From 881b46e3b1682974f48fc196f483c3fa2648b998 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 4 Nov 2018 17:26:28 -0800 Subject: first-draft kafka workers (changelog, release_update) --- python/fatcat_worker.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100755 python/fatcat_worker.py (limited to 'python/fatcat_worker.py') diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py new file mode 100755 index 00000000..cc11beca --- /dev/null +++ b/python/fatcat_worker.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +import sys +import argparse +from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker + +def run_changelog_worker(args): + topic = "fatcat-{}.changelog".format(args.env) + worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic, + args.poll_interval) + worker.run() + +def run_entity_updates_worker(args): + changelog_topic = "fatcat-{}.changelog".format(args.env) + release_topic = "fatcat-{}.release-updates".format(args.env) + worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts, + changelog_topic, release_topic) + worker.run() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--debug', + action='store_true', + help="enable debug logging") + parser.add_argument('--api-host-url', + default="http://localhost:9411/v0", + help="fatcat API host/port to use") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--env', + default="qa", + help="Kafka topic namespace to use (eg, prod, qa)") + subparsers = parser.add_subparsers() + + sub_changelog = subparsers.add_parser('changelog') + sub_changelog.set_defaults(func=run_changelog_worker) + sub_changelog.add_argument('--poll-interval', + help="how long to wait between polling (seconds)", + default=10.0, type=float) + + sub_entity_updates = subparsers.add_parser('entity-updates') + sub_entity_updates.set_defaults(func=run_entity_updates_worker) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + args.func(args) + +if __name__ == '__main__': + main() -- cgit v1.2.3