aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_worker.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-12 23:13:22 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-12 23:13:22 -0800
commit055c464deea8cdaccf3ed384995d4409b0f51409 (patch)
tree582e3d62d192d51fafed0c5e3321fd21a3a42d34 /python/fatcat_worker.py
parent6828313ecf2fea06e500f447f3abb92e5651c1fa (diff)
parent942f23257bcf1059a2502a3b019ce5af1bde7de5 (diff)
downloadfatcat-055c464deea8cdaccf3ed384995d4409b0f51409.tar.gz
fatcat-055c464deea8cdaccf3ed384995d4409b0f51409.zip
Merge branch 'kafka'
Diffstat (limited to 'python/fatcat_worker.py')
-rwxr-xr-xpython/fatcat_worker.py62
1 files changed, 62 insertions, 0 deletions
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
new file mode 100755
index 00000000..50ff0fb7
--- /dev/null
+++ b/python/fatcat_worker.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python3
+
+import sys
+import argparse
+from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker
+from fatcat.elastic_workers import FatcatElasticReleaseWorker
+
+def run_changelog_worker(args):
+ topic = "fatcat-{}.changelog".format(args.env)
+ worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic,
+ args.poll_interval)
+ worker.run()
+
+def run_entity_updates_worker(args):
+ changelog_topic = "fatcat-{}.changelog".format(args.env)
+ release_topic = "fatcat-{}.release-updates".format(args.env)
+ worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts,
+ changelog_topic, release_topic)
+ worker.run()
+
+def run_elastic_release_worker(args):
+ consume_topic = "fatcat-{}.release-updates".format(args.env)
+ worker = FatcatElasticReleaseWorker(args.kafka_hosts,
+ consume_topic)
+ worker.run()
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--debug',
+ action='store_true',
+ help="enable debug logging")
+ parser.add_argument('--api-host-url',
+ default="http://localhost:9411/v0",
+ help="fatcat API host/port to use")
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--env',
+ default="qa",
+ help="Kafka topic namespace to use (eg, prod, qa)")
+ subparsers = parser.add_subparsers()
+
+ sub_changelog = subparsers.add_parser('changelog')
+ sub_changelog.set_defaults(func=run_changelog_worker)
+ sub_changelog.add_argument('--poll-interval',
+ help="how long to wait between polling (seconds)",
+ default=10.0, type=float)
+
+ sub_entity_updates = subparsers.add_parser('entity-updates')
+ sub_entity_updates.set_defaults(func=run_entity_updates_worker)
+
+ sub_elastic_release = subparsers.add_parser('elastic-release')
+ sub_elastic_release.set_defaults(func=run_elastic_release_worker)
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+ args.func(args)
+
+if __name__ == '__main__':
+ main()