diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-01-14 17:07:34 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-14 17:07:34 -0800 |
commit | d4d8e6f2026a5b4d1b08a008791f34140e678006 (patch) | |
tree | 4268d5ae8e50981dd176fcfd21a25b24cab5e17c | |
parent | c2eb4dad14c5b1d3566f39519065eb20eb7fd57f (diff) | |
download | sandcrawler-d4d8e6f2026a5b4d1b08a008791f34140e678006.tar.gz sandcrawler-d4d8e6f2026a5b4d1b08a008791f34140e678006.zip |
bulk ingest file request topic support
-rwxr-xr-x | python/sandcrawler_worker.py | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 9199874..b164b8e 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -65,7 +65,10 @@ def run_persist_grobid(args): pusher.run() def run_ingest_file(args): - consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + if args.bulk: + consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env) + else: + consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) sink = KafkaSink( @@ -150,6 +153,9 @@ def main(): sub_ingest_file = subparsers.add_parser('ingest-file', help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") + sub_ingest_file.add_argument('--bulk', + action='store_true', + help="consume from bulk kafka topic (eg, for ingest backfill)") sub_ingest_file.set_defaults(func=run_ingest_file) sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', |