aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-xpython/sandcrawler_worker.py83
1 files changed, 83 insertions, 0 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 5720f48..950eb4b 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -70,6 +70,75 @@ def run_persist_grobid(args):
)
pusher.run()
+def run_pdf_extract(args):
+ consume_topic = "sandcrawler-{}.unextracted-pg".format(args.env)
+ text_topic = "sandcrawler-{}.pdftext".format(args.env)
+ thumbnail_topic = "sandcrawler-{}.thumbnail-180px-jpeg".format(args.env)
+ text_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=text_topic,
+ )
+ thumbnail_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=thumbnail_topic,
+ )
+ wayback_client = WaybackClient(
+ host_url=args.grobid_host,
+ )
+ worker = PdfExtractWorker(
+ wayback_client=wayback_client,
+ sink=text_sink,
+ thumbnail_sink=thumbnail_sink,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="pdf-extract",
+ batch_size=1,
+ )
+ pusher.run()
+
+def run_persist_pdftext(args):
+ consume_topic = "sandcrawler-{}.pdftext".format(args.env)
+ worker = PersistPdfTextWorker(
+ db_url=args.db_url,
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ s3_only=args.s3_only,
+ db_only=args.db_only,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-pdftext",
+ push_batches=True,
+ batch_size=25,
+ )
+ pusher.run()
+
+def run_persist_thumbnail(args):
+ consume_topic = "sandcrawler-{}.thumbnail".format(args.env)
+ worker = PersistThumbnailWorker(
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ # TODO: s3_extension=args.s3_extension,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-thumbnail",
+ raw_records=True,
+ batch_size=25,
+ )
+ pusher.run()
+
def run_persist_pdftrio(args):
consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env)
worker = PersistPdfTrioWorker(
@@ -179,6 +248,20 @@ def main():
help="only write status to database (don't upload TEI-XML to S3)")
sub_persist_grobid.set_defaults(func=run_persist_grobid)
+ sub_persist_pdftext = subparsers.add_parser('persist-pdftext',
+ help="daemon that consumes pdftext output from Kafka and pushes to minio and postgres")
+ sub_persist_pdftext.add_argument('--s3-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_persist_pdftext.add_argument('--db-only',
+ action='store_true',
+ help="only write status to database (don't upload TEI-XML to S3)")
+ sub_persist_pdftext.set_defaults(func=run_persist_pdftext)
+
+ sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail',
+ help="daemon that consumes thumbnail output from Kafka and pushes to minio and postgres")
+ sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail)
+
sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio',
help="daemon that consumes pdftrio output from Kafka and pushes to postgres")
sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio)