diff options
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-x | python/sandcrawler_worker.py | 151 |
1 files changed, 87 insertions, 64 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index e185fad..3c76c17 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ These are generally for continuously running workers that consume from Kafka. Outputs might either be pushed back into Kafka, or directly into sandcrawler-db @@ -31,12 +30,8 @@ def run_grobid_extract(args): kafka_hosts=args.kafka_hosts, produce_topic=produce_topic, ) - grobid_client = GrobidClient( - host_url=args.grobid_host, - ) - wayback_client = WaybackClient( - host_url=args.grobid_host, - ) + grobid_client = GrobidClient(host_url=args.grobid_host, ) + wayback_client = WaybackClient(host_url=args.grobid_host, ) worker = GrobidWorker( grobid_client=grobid_client, wayback_client=wayback_client, @@ -51,6 +46,7 @@ def run_grobid_extract(args): ) pusher.run() + def run_pdf_extract(args): consume_topic = "sandcrawler-{}.unextracted".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) @@ -63,9 +59,7 @@ def run_pdf_extract(args): kafka_hosts=args.kafka_hosts, produce_topic=thumbnail_topic, ) - wayback_client = WaybackClient( - host_url=args.grobid_host, - ) + wayback_client = WaybackClient(host_url=args.grobid_host, ) worker = PdfExtractWorker( wayback_client=wayback_client, sink=pdftext_sink, @@ -81,6 +75,7 @@ def run_pdf_extract(args): ) pusher.run() + def run_persist_grobid(args): consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) worker = PersistGrobidWorker( @@ -105,6 +100,7 @@ def run_persist_grobid(args): ) pusher.run() + def run_persist_pdftext(args): consume_topic = "sandcrawler-{}.pdf-text".format(args.env) worker = PersistPdfTextWorker( @@ -129,6 +125,7 @@ def run_persist_pdftext(args): ) pusher.run() + def run_persist_thumbnail(args): consume_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) worker = PersistThumbnailWorker( @@ -150,6 +147,7 @@ def run_persist_thumbnail(args): ) pusher.run() + def run_persist_xml_doc(args: argparse.Namespace) -> None: consume_topic = f"sandcrawler-{args.env}.xml-doc" worker = PersistXmlDocWorker( @@ -168,6 +166,7 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None: ) pusher.run() + def run_persist_html_teixml(args: argparse.Namespace) -> None: consume_topic = f"sandcrawler-{args.env}.html-teixml" worker = PersistHtmlTeiXmlWorker( @@ -186,11 +185,10 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None: ) pusher.run() + def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) - worker = PersistPdfTrioWorker( - db_url=args.db_url, - ) + worker = PersistPdfTrioWorker(db_url=args.db_url, ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -201,6 +199,7 @@ def run_persist_pdftrio(args): ) pusher.run() + def run_ingest_file(args): spn_cdx_retry_sec = 9.0 if args.bulk: @@ -228,9 +227,7 @@ def run_ingest_file(args): kafka_hosts=args.kafka_hosts, produce_topic=grobid_topic, ) - grobid_client = GrobidClient( - host_url=args.grobid_host, - ) + grobid_client = GrobidClient(host_url=args.grobid_host, ) pdftext_sink = KafkaCompressSink( kafka_hosts=args.kafka_hosts, produce_topic=pdftext_topic, @@ -268,11 +265,10 @@ def run_ingest_file(args): ) pusher.run() + def run_persist_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) - worker = PersistIngestFileResultWorker( - db_url=args.db_url, - ) + worker = PersistIngestFileResultWorker(db_url=args.db_url, ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -283,90 +279,116 @@ def run_persist_ingest_file(args): ) pusher.run() + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") parser.add_argument('--env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") + default="dev", + help="Kafka topic namespace to use (eg, prod, qa, dev)") parser.add_argument('--grobid-host', - default="http://grobid.qa.fatcat.wiki", - help="GROBID API host/port") + default="http://grobid.qa.fatcat.wiki", + help="GROBID API host/port") parser.add_argument('--db-url', - help="postgresql database connection string", - default="postgres:///sandcrawler") - parser.add_argument('--s3-url', - help="S3 (seaweedfs) backend URL", - default="localhost:9000") + help="postgresql database connection string", + default="postgres:///sandcrawler") + parser.add_argument('--s3-url', help="S3 (seaweedfs) backend URL", default="localhost:9000") parser.add_argument('--s3-access-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') + or os.environ.get('MINIO_ACCESS_KEY')) parser.add_argument('--s3-secret-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') or os.environ.get('MINIO_SECRET_KEY')) + help="S3 (seaweedfs) credential", + default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') + or os.environ.get('MINIO_SECRET_KEY')) parser.add_argument('--s3-bucket', - help="S3 (seaweedfs) bucket to persist into", - default="sandcrawler-dev") + help="S3 (seaweedfs) bucket to persist into", + default="sandcrawler-dev") subparsers = parser.add_subparsers() - sub_grobid_extract = subparsers.add_parser('grobid-extract', - help="daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka") + sub_grobid_extract = subparsers.add_parser( + 'grobid-extract', + help= + "daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka" + ) sub_grobid_extract.set_defaults(func=run_grobid_extract) - sub_pdf_extract = subparsers.add_parser('pdf-extract', - help="daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka") + sub_pdf_extract = subparsers.add_parser( + 'pdf-extract', + help= + "daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka" + ) sub_pdf_extract.set_defaults(func=run_pdf_extract) - sub_persist_grobid = subparsers.add_parser('persist-grobid', - help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres") + sub_persist_grobid = subparsers.add_parser( + 'persist-grobid', + help= + "daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres" + ) sub_persist_grobid.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_persist_grobid.add_argument('--db-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_persist_grobid.add_argument( + '--db-only', action='store_true', help="only write status to database (don't upload TEI-XML to S3)") sub_persist_grobid.set_defaults(func=run_persist_grobid) - sub_persist_pdftext = subparsers.add_parser('persist-pdftext', - help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres") + sub_persist_pdftext = subparsers.add_parser( + 'persist-pdftext', + help= + "daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres" + ) sub_persist_pdftext.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_persist_pdftext.add_argument('--db-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_persist_pdftext.add_argument( + '--db-only', action='store_true', help="only write status to database (don't upload TEI-XML to S3)") sub_persist_pdftext.set_defaults(func=run_persist_pdftext) - sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail', - help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres") + sub_persist_thumbnail = subparsers.add_parser( + 'persist-thumbnail', + help= + "daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres" + ) sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) - sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc', - help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_xml_doc = subparsers.add_parser( + 'persist-xml-doc', + help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket" + ) sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc) - sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml', - help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_html_teixml = subparsers.add_parser( + 'persist-html-teixml', + help= + "daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket" + ) sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml) - sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', + sub_persist_pdftrio = subparsers.add_parser( + 'persist-pdftrio', help="daemon that consumes pdftrio output from Kafka and pushes to postgres") sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) - sub_ingest_file = subparsers.add_parser('ingest-file', + sub_ingest_file = subparsers.add_parser( + 'ingest-file', help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") sub_ingest_file.add_argument('--bulk', - action='store_true', - help="consume from bulk kafka topic (eg, for ingest backfill)") - sub_ingest_file.add_argument('--priority', + action='store_true', + help="consume from bulk kafka topic (eg, for ingest backfill)") + sub_ingest_file.add_argument( + '--priority', action='store_true', help="consume from priority kafka topic (eg, for SPN requests)") sub_ingest_file.set_defaults(func=run_ingest_file) - sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', + sub_persist_ingest_file = subparsers.add_parser( + 'persist-ingest-file', help="daemon that consumes ingest-file output from Kafka and pushes to postgres") sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file) @@ -377,5 +399,6 @@ def main(): args.func(args) + if __name__ == '__main__': main() |