From 386cb8335d4d1a66b75301a244f7baed49658588 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Jun 2020 18:06:12 -0700 Subject: tweak kafka topic names and seaweedfs layout --- python/sandcrawler_worker.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) (limited to 'python/sandcrawler_worker.py') diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 950eb4b..e18d883 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -71,9 +71,9 @@ def run_persist_grobid(args): pusher.run() def run_pdf_extract(args): - consume_topic = "sandcrawler-{}.unextracted-pg".format(args.env) - text_topic = "sandcrawler-{}.pdftext".format(args.env) - thumbnail_topic = "sandcrawler-{}.thumbnail-180px-jpeg".format(args.env) + consume_topic = "sandcrawler-{}.unextracted".format(args.env) + text_topic = "sandcrawler-{}.pdf-text".format(args.env) + thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) text_sink = KafkaSink( kafka_hosts=args.kafka_hosts, produce_topic=text_topic, @@ -100,7 +100,7 @@ def run_pdf_extract(args): pusher.run() def run_persist_pdftext(args): - consume_topic = "sandcrawler-{}.pdftext".format(args.env) + consume_topic = "sandcrawler-{}.pdf-text".format(args.env) worker = PersistPdfTextWorker( db_url=args.db_url, s3_url=args.s3_url, @@ -114,27 +114,28 @@ def run_persist_pdftext(args): worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-pdftext", + group="persist-pdf-text", push_batches=True, batch_size=25, ) pusher.run() def run_persist_thumbnail(args): - consume_topic = "sandcrawler-{}.thumbnail".format(args.env) + consume_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) worker = PersistThumbnailWorker( s3_url=args.s3_url, s3_bucket=args.s3_bucket, s3_access_key=args.s3_access_key, s3_secret_key=args.s3_secret_key, - # TODO: s3_extension=args.s3_extension, + s3_extension=".180px.jpg", + s3_folder="pdf", ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-thumbnail", - raw_records=True, + group="persist-pdf-thumbnail", + raw_record=True, batch_size=25, ) pusher.run() -- cgit v1.2.3