From 386cb8335d4d1a66b75301a244f7baed49658588 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Jun 2020 18:06:12 -0700 Subject: tweak kafka topic names and seaweedfs layout --- python/sandcrawler/persist.py | 3 ++- python/sandcrawler_worker.py | 19 ++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 196c4b9..8d421ad 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -452,6 +452,7 @@ class PersistThumbnailWorker(SandcrawlerWorker): default_bucket=kwargs['s3_bucket'], ) self.s3_extension = kwargs.get('s3_extension', ".jpg") + self.s3_folder = kwargs.get('s3_folder', "pdf") def process(self, blob, key=None): """ @@ -463,7 +464,7 @@ class PersistThumbnailWorker(SandcrawlerWorker): assert len(blob) >= 50 resp = self.s3.put_blob( - folder="thumbnail", + folder=self.s3_folder, blob=blob, sha1hex=key, extension=self.s3_extension, diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 950eb4b..e18d883 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -71,9 +71,9 @@ def run_persist_grobid(args): pusher.run() def run_pdf_extract(args): - consume_topic = "sandcrawler-{}.unextracted-pg".format(args.env) - text_topic = "sandcrawler-{}.pdftext".format(args.env) - thumbnail_topic = "sandcrawler-{}.thumbnail-180px-jpeg".format(args.env) + consume_topic = "sandcrawler-{}.unextracted".format(args.env) + text_topic = "sandcrawler-{}.pdf-text".format(args.env) + thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) text_sink = KafkaSink( kafka_hosts=args.kafka_hosts, produce_topic=text_topic, @@ -100,7 +100,7 @@ def run_pdf_extract(args): pusher.run() def run_persist_pdftext(args): - consume_topic = "sandcrawler-{}.pdftext".format(args.env) + consume_topic = "sandcrawler-{}.pdf-text".format(args.env) worker = PersistPdfTextWorker( db_url=args.db_url, s3_url=args.s3_url, @@ -114,27 +114,28 @@ def run_persist_pdftext(args): worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-pdftext", + group="persist-pdf-text", push_batches=True, batch_size=25, ) pusher.run() def run_persist_thumbnail(args): - consume_topic = "sandcrawler-{}.thumbnail".format(args.env) + consume_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) worker = PersistThumbnailWorker( s3_url=args.s3_url, s3_bucket=args.s3_bucket, s3_access_key=args.s3_access_key, s3_secret_key=args.s3_secret_key, - # TODO: s3_extension=args.s3_extension, + s3_extension=".180px.jpg", + s3_folder="pdf", ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-thumbnail", - raw_records=True, + group="persist-pdf-thumbnail", + raw_record=True, batch_size=25, ) pusher.run() -- cgit v1.2.3