diff options
author | Bryan Newbold <bnewbold@archive.org> | 2021-10-27 18:50:17 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2021-10-27 18:50:17 -0700 |
commit | 826c7538e091fac14d987a3cd654975da964e240 (patch) | |
tree | 90345b4cabb461c624ca5a218c2fc01dce3055cd /python/sandcrawler_worker.py | |
parent | 020037d4714e7ba2ab172c7278494aed0b2148ad (diff) | |
download | sandcrawler-826c7538e091fac14d987a3cd654975da964e240.tar.gz sandcrawler-826c7538e091fac14d987a3cd654975da964e240.zip |
make fmt (black 21.9b0)
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-x | python/sandcrawler_worker.py | 178 |
1 files changed, 102 insertions, 76 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 3e35807..d42cd8c 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -16,7 +16,7 @@ from sandcrawler.persist import PersistHtmlTeiXmlWorker, PersistXmlDocWorker # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable try: - git_sha = raven.fetch_git_sha('..') + git_sha = raven.fetch_git_sha("..") except Exception: git_sha = None sentry_client = raven.Client(release=git_sha) @@ -29,8 +29,12 @@ def run_grobid_extract(args): kafka_hosts=args.kafka_hosts, produce_topic=produce_topic, ) - grobid_client = GrobidClient(host_url=args.grobid_host, ) - wayback_client = WaybackClient(host_url=args.grobid_host, ) + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) + wayback_client = WaybackClient( + host_url=args.grobid_host, + ) worker = GrobidWorker( grobid_client=grobid_client, wayback_client=wayback_client, @@ -58,7 +62,9 @@ def run_pdf_extract(args): kafka_hosts=args.kafka_hosts, produce_topic=thumbnail_topic, ) - wayback_client = WaybackClient(host_url=args.grobid_host, ) + wayback_client = WaybackClient( + host_url=args.grobid_host, + ) worker = PdfExtractWorker( wayback_client=wayback_client, sink=pdftext_sink, @@ -187,7 +193,9 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None: def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) - worker = PersistPdfTrioWorker(db_url=args.db_url, ) + worker = PersistPdfTrioWorker( + db_url=args.db_url, + ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -226,7 +234,9 @@ def run_ingest_file(args): kafka_hosts=args.kafka_hosts, produce_topic=grobid_topic, ) - grobid_client = GrobidClient(host_url=args.grobid_host, ) + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) pdftext_sink = KafkaCompressSink( kafka_hosts=args.kafka_hosts, produce_topic=pdftext_topic, @@ -267,7 +277,9 @@ def run_ingest_file(args): def run_persist_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) - worker = PersistIngestFileResultWorker(db_url=args.db_url, ) + worker = PersistIngestFileResultWorker( + db_url=args.db_url, + ) pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -281,114 +293,128 @@ def run_persist_ingest_file(args): def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") - parser.add_argument('--env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") - parser.add_argument('--grobid-host', - default="http://grobid.qa.fatcat.wiki", - help="GROBID API host/port") - parser.add_argument('--db-url', - help="postgresql database connection string", - default="postgres:///sandcrawler") - parser.add_argument('--s3-url', help="S3 (seaweedfs) backend URL", default="localhost:9000") - parser.add_argument('--s3-access-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') - or os.environ.get('MINIO_ACCESS_KEY')) - parser.add_argument('--s3-secret-key', - help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') - or os.environ.get('MINIO_SECRET_KEY')) - parser.add_argument('--s3-bucket', - help="S3 (seaweedfs) bucket to persist into", - default="sandcrawler-dev") + parser.add_argument( + "--kafka-hosts", + default="localhost:9092", + help="list of Kafka brokers (host/port) to use", + ) + parser.add_argument( + "--env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)" + ) + parser.add_argument( + "--grobid-host", default="http://grobid.qa.fatcat.wiki", help="GROBID API host/port" + ) + parser.add_argument( + "--db-url", + help="postgresql database connection string", + default="postgres:///sandcrawler", + ) + parser.add_argument("--s3-url", help="S3 (seaweedfs) backend URL", default="localhost:9000") + parser.add_argument( + "--s3-access-key", + help="S3 (seaweedfs) credential", + default=os.environ.get("SANDCRAWLER_BLOB_ACCESS_KEY") + or os.environ.get("MINIO_ACCESS_KEY"), + ) + parser.add_argument( + "--s3-secret-key", + help="S3 (seaweedfs) credential", + default=os.environ.get("SANDCRAWLER_BLOB_SECRET_KEY") + or os.environ.get("MINIO_SECRET_KEY"), + ) + parser.add_argument( + "--s3-bucket", help="S3 (seaweedfs) bucket to persist into", default="sandcrawler-dev" + ) subparsers = parser.add_subparsers() sub_grobid_extract = subparsers.add_parser( - 'grobid-extract', - help= - "daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka" + "grobid-extract", + help="daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka", ) sub_grobid_extract.set_defaults(func=run_grobid_extract) sub_pdf_extract = subparsers.add_parser( - 'pdf-extract', - help= - "daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka" + "pdf-extract", + help="daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka", ) sub_pdf_extract.set_defaults(func=run_pdf_extract) sub_persist_grobid = subparsers.add_parser( - 'persist-grobid', - help= - "daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres" + "persist-grobid", + help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres", + ) + sub_persist_grobid.add_argument( + "--s3-only", + action="store_true", + help="only upload TEI-XML to S3 (don't write to database)", ) - sub_persist_grobid.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") sub_persist_grobid.add_argument( - '--db-only', - action='store_true', - help="only write status to database (don't upload TEI-XML to S3)") + "--db-only", + action="store_true", + help="only write status to database (don't upload TEI-XML to S3)", + ) sub_persist_grobid.set_defaults(func=run_persist_grobid) sub_persist_pdftext = subparsers.add_parser( - 'persist-pdftext', - help= - "daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres" + "persist-pdftext", + help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres", + ) + sub_persist_pdftext.add_argument( + "--s3-only", + action="store_true", + help="only upload TEI-XML to S3 (don't write to database)", ) - sub_persist_pdftext.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") sub_persist_pdftext.add_argument( - '--db-only', - action='store_true', - help="only write status to database (don't upload TEI-XML to S3)") + "--db-only", + action="store_true", + help="only write status to database (don't upload TEI-XML to S3)", + ) sub_persist_pdftext.set_defaults(func=run_persist_pdftext) sub_persist_thumbnail = subparsers.add_parser( - 'persist-thumbnail', - help= - "daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres" + "persist-thumbnail", + help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres", ) sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) sub_persist_xml_doc = subparsers.add_parser( - 'persist-xml-doc', - help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket" + "persist-xml-doc", + help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket", ) sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc) sub_persist_html_teixml = subparsers.add_parser( - 'persist-html-teixml', - help= - "daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket" + "persist-html-teixml", + help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket", ) sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml) sub_persist_pdftrio = subparsers.add_parser( - 'persist-pdftrio', - help="daemon that consumes pdftrio output from Kafka and pushes to postgres") + "persist-pdftrio", + help="daemon that consumes pdftrio output from Kafka and pushes to postgres", + ) sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) sub_ingest_file = subparsers.add_parser( - 'ingest-file', - help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") - sub_ingest_file.add_argument('--bulk', - action='store_true', - help="consume from bulk kafka topic (eg, for ingest backfill)") + "ingest-file", + help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka", + ) + sub_ingest_file.add_argument( + "--bulk", + action="store_true", + help="consume from bulk kafka topic (eg, for ingest backfill)", + ) sub_ingest_file.add_argument( - '--priority', - action='store_true', - help="consume from priority kafka topic (eg, for SPN requests)") + "--priority", + action="store_true", + help="consume from priority kafka topic (eg, for SPN requests)", + ) sub_ingest_file.set_defaults(func=run_ingest_file) sub_persist_ingest_file = subparsers.add_parser( - 'persist-ingest-file', - help="daemon that consumes ingest-file output from Kafka and pushes to postgres") + "persist-ingest-file", + help="daemon that consumes ingest-file output from Kafka and pushes to postgres", + ) sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file) args = parser.parse_args() @@ -399,5 +425,5 @@ def main(): args.func(args) -if __name__ == '__main__': +if __name__ == "__main__": main() |