From 833f9bb5181419ca9f5af0f9ba0e2e047ee164d4 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 7 Dec 2021 19:09:54 -0800 Subject: worker: add kafka_group_suffix option --- python/sandcrawler_worker.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 52d126a..482dc33 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -99,6 +99,8 @@ def run_persist_grobid(args): kafka_group = "persist-grobid" if args.s3_only: kafka_group += "-s3" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -124,6 +126,8 @@ def run_persist_pdftext(args): kafka_group = "persist-pdf-text" if args.s3_only: kafka_group += "-s3" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -145,11 +149,14 @@ def run_persist_thumbnail(args): s3_extension=".180px.jpg", s3_folder="pdf", ) + kafka_group = "persist-pdf-thumbnail" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-pdf-thumbnail", + group=kafka_group, push_batches=False, raw_records=True, batch_size=25, @@ -165,11 +172,14 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None: s3_access_key=args.s3_access_key, s3_secret_key=args.s3_secret_key, ) + kafka_group = "persist-xml-doc" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-xml-doc", + group=kafka_group, push_batches=False, batch_size=25, ) @@ -184,11 +194,14 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None: s3_access_key=args.s3_access_key, s3_secret_key=args.s3_secret_key, ) + kafka_group = "persist-html-teixml" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-html-teixml", + group=kafka_group, push_batches=False, batch_size=25, ) @@ -328,6 +341,9 @@ def main(): parser.add_argument( "--env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)" ) + parser.add_argument( + "--kafka-group-suffix", default="", help="Kafka consumer group suffix (optional)" + ) parser.add_argument( "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port" ) -- cgit v1.2.3