diff options
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-x | python/sandcrawler_worker.py | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 5720f48..950eb4b 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -70,6 +70,75 @@ def run_persist_grobid(args): ) pusher.run() +def run_pdf_extract(args): + consume_topic = "sandcrawler-{}.unextracted-pg".format(args.env) + text_topic = "sandcrawler-{}.pdftext".format(args.env) + thumbnail_topic = "sandcrawler-{}.thumbnail-180px-jpeg".format(args.env) + text_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=text_topic, + ) + thumbnail_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=thumbnail_topic, + ) + wayback_client = WaybackClient( + host_url=args.grobid_host, + ) + worker = PdfExtractWorker( + wayback_client=wayback_client, + sink=text_sink, + thumbnail_sink=thumbnail_sink, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="pdf-extract", + batch_size=1, + ) + pusher.run() + +def run_persist_pdftext(args): + consume_topic = "sandcrawler-{}.pdftext".format(args.env) + worker = PersistPdfTextWorker( + db_url=args.db_url, + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + s3_only=args.s3_only, + db_only=args.db_only, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-pdftext", + push_batches=True, + batch_size=25, + ) + pusher.run() + +def run_persist_thumbnail(args): + consume_topic = "sandcrawler-{}.thumbnail".format(args.env) + worker = PersistThumbnailWorker( + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + # TODO: s3_extension=args.s3_extension, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-thumbnail", + raw_records=True, + batch_size=25, + ) + pusher.run() + def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) worker = PersistPdfTrioWorker( @@ -179,6 +248,20 @@ def main(): help="only write status to database (don't upload TEI-XML to S3)") sub_persist_grobid.set_defaults(func=run_persist_grobid) + sub_persist_pdftext = subparsers.add_parser('persist-pdftext', + help="daemon that consumes pdftext output from Kafka and pushes to minio and postgres") + sub_persist_pdftext.add_argument('--s3-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_persist_pdftext.add_argument('--db-only', + action='store_true', + help="only write status to database (don't upload TEI-XML to S3)") + sub_persist_pdftext.set_defaults(func=run_persist_pdftext) + + sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail', + help="daemon that consumes thumbnail output from Kafka and pushes to minio and postgres") + sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) + sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', help="daemon that consumes pdftrio output from Kafka and pushes to postgres") sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) |