aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-12-07 19:09:54 -0800
committerBryan Newbold <bnewbold@archive.org>2021-12-07 19:10:23 -0800
commit833f9bb5181419ca9f5af0f9ba0e2e047ee164d4 (patch)
tree0fa8201027edfedd6687a233f343361e01cfd615
parent5c82ee1b965e1f3901294c752d8b2d24c6bdc974 (diff)
downloadsandcrawler-833f9bb5181419ca9f5af0f9ba0e2e047ee164d4.tar.gz
sandcrawler-833f9bb5181419ca9f5af0f9ba0e2e047ee164d4.zip
worker: add kafka_group_suffix option
-rwxr-xr-xpython/sandcrawler_worker.py22
1 files changed, 19 insertions, 3 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 52d126a..482dc33 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -99,6 +99,8 @@ def run_persist_grobid(args):
kafka_group = "persist-grobid"
if args.s3_only:
kafka_group += "-s3"
+ if args.kafka_group_suffix:
+ kafka_group += args.kafka_group_suffix
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
@@ -124,6 +126,8 @@ def run_persist_pdftext(args):
kafka_group = "persist-pdf-text"
if args.s3_only:
kafka_group += "-s3"
+ if args.kafka_group_suffix:
+ kafka_group += args.kafka_group_suffix
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
@@ -145,11 +149,14 @@ def run_persist_thumbnail(args):
s3_extension=".180px.jpg",
s3_folder="pdf",
)
+ kafka_group = "persist-pdf-thumbnail"
+ if args.kafka_group_suffix:
+ kafka_group += args.kafka_group_suffix
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
consume_topic=consume_topic,
- group="persist-pdf-thumbnail",
+ group=kafka_group,
push_batches=False,
raw_records=True,
batch_size=25,
@@ -165,11 +172,14 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None:
s3_access_key=args.s3_access_key,
s3_secret_key=args.s3_secret_key,
)
+ kafka_group = "persist-xml-doc"
+ if args.kafka_group_suffix:
+ kafka_group += args.kafka_group_suffix
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
consume_topic=consume_topic,
- group="persist-xml-doc",
+ group=kafka_group,
push_batches=False,
batch_size=25,
)
@@ -184,11 +194,14 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None:
s3_access_key=args.s3_access_key,
s3_secret_key=args.s3_secret_key,
)
+ kafka_group = "persist-html-teixml"
+ if args.kafka_group_suffix:
+ kafka_group += args.kafka_group_suffix
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
consume_topic=consume_topic,
- group="persist-html-teixml",
+ group=kafka_group,
push_batches=False,
batch_size=25,
)
@@ -329,6 +342,9 @@ def main():
"--env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)"
)
parser.add_argument(
+ "--kafka-group-suffix", default="", help="Kafka consumer group suffix (optional)"
+ )
+ parser.add_argument(
"--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port"
)
parser.add_argument(