aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-01-14 17:07:34 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-14 17:07:34 -0800
commitd4d8e6f2026a5b4d1b08a008791f34140e678006 (patch)
tree4268d5ae8e50981dd176fcfd21a25b24cab5e17c
parentc2eb4dad14c5b1d3566f39519065eb20eb7fd57f (diff)
downloadsandcrawler-d4d8e6f2026a5b4d1b08a008791f34140e678006.tar.gz
sandcrawler-d4d8e6f2026a5b4d1b08a008791f34140e678006.zip
bulk ingest file request topic support
-rwxr-xr-xpython/sandcrawler_worker.py8
1 files changed, 7 insertions, 1 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 9199874..b164b8e 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -65,7 +65,10 @@ def run_persist_grobid(args):
pusher.run()
def run_ingest_file(args):
- consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
+ if args.bulk:
+ consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env)
+ else:
+ consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
sink = KafkaSink(
@@ -150,6 +153,9 @@ def main():
sub_ingest_file = subparsers.add_parser('ingest-file',
help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka")
+ sub_ingest_file.add_argument('--bulk',
+ action='store_true',
+ help="consume from bulk kafka topic (eg, for ingest backfill)")
sub_ingest_file.set_defaults(func=run_ingest_file)
sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file',