diff options
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-x | python/sandcrawler_worker.py | 277 |
1 files changed, 201 insertions, 76 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 6be8bac..aebcbe1 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -1,26 +1,23 @@ #!/usr/bin/env python3 - """ These are generally for continuously running workers that consume from Kafka. Outputs might either be pushed back into Kafka, or directly into sandcrawler-db or S3 (SeaweedFS). """ +import argparse import os +import subprocess import sys -import argparse -import datetime -import raven -from sandcrawler import * -from sandcrawler.persist import PersistXmlDocWorker, PersistHtmlTeiXmlWorker +import sentry_sdk -# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable -try: - git_sha = raven.fetch_git_sha('..') -except Exception as e: - git_sha = None -sentry_client = raven.Client(release=git_sha) +from sandcrawler import * +from sandcrawler.persist import ( + PersistCrossrefWorker, + PersistHtmlTeiXmlWorker, + PersistXmlDocWorker, +) def run_grobid_extract(args): @@ -50,6 +47,7 @@ def run_grobid_extract(args): ) pusher.run() + def run_pdf_extract(args): consume_topic = "sandcrawler-{}.unextracted".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) @@ -80,6 +78,7 @@ def run_pdf_extract(args): ) pusher.run() + def run_persist_grobid(args): consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) worker = PersistGrobidWorker( @@ -94,6 +93,8 @@ def run_persist_grobid(args): kafka_group = "persist-grobid" if args.s3_only: kafka_group += "-s3" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -104,6 +105,7 @@ def run_persist_grobid(args): ) pusher.run() + def run_persist_pdftext(args): consume_topic = "sandcrawler-{}.pdf-text".format(args.env) worker = PersistPdfTextWorker( @@ -118,6 +120,8 @@ def run_persist_pdftext(args): kafka_group = "persist-pdf-text" if args.s3_only: kafka_group += "-s3" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, @@ -128,6 +132,7 @@ def run_persist_pdftext(args): ) pusher.run() + def run_persist_thumbnail(args): consume_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env) worker = PersistThumbnailWorker( @@ -138,17 +143,21 @@ def run_persist_thumbnail(args): s3_extension=".180px.jpg", s3_folder="pdf", ) + kafka_group = "persist-pdf-thumbnail" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-pdf-thumbnail", + group=kafka_group, push_batches=False, raw_records=True, batch_size=25, ) pusher.run() + def run_persist_xml_doc(args: argparse.Namespace) -> None: consume_topic = f"sandcrawler-{args.env}.xml-doc" worker = PersistXmlDocWorker( @@ -157,16 +166,20 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None: s3_access_key=args.s3_access_key, s3_secret_key=args.s3_secret_key, ) + kafka_group = "persist-xml-doc" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-xml-doc", + group=kafka_group, push_batches=False, batch_size=25, ) pusher.run() + def run_persist_html_teixml(args: argparse.Namespace) -> None: consume_topic = f"sandcrawler-{args.env}.html-teixml" worker = PersistHtmlTeiXmlWorker( @@ -175,16 +188,20 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None: s3_access_key=args.s3_access_key, s3_secret_key=args.s3_secret_key, ) + kafka_group = "persist-html-teixml" + if args.kafka_group_suffix: + kafka_group += args.kafka_group_suffix pusher = KafkaJsonPusher( worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, - group="persist-html-teixml", + group=kafka_group, push_batches=False, batch_size=25, ) pusher.run() + def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) worker = PersistPdfTrioWorker( @@ -200,13 +217,20 @@ def run_persist_pdftrio(args): ) pusher.run() + def run_ingest_file(args): + spn_cdx_retry_sec = 9.0 if args.bulk: consume_group = "sandcrawler-{}-ingest-file-bulk".format(args.env) consume_topic = "sandcrawler-{}.ingest-file-requests-bulk".format(args.env) + elif args.priority: + spn_cdx_retry_sec = 45.0 + consume_group = "sandcrawler-{}-ingest-file-priority".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-priority".format(args.env) else: + spn_cdx_retry_sec = 1.0 consume_group = "sandcrawler-{}-ingest-file".format(args.env) - consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) + consume_topic = "sandcrawler-{}.ingest-file-requests-daily".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env) @@ -248,8 +272,9 @@ def run_ingest_file(args): pdftext_sink=pdftext_sink, xmldoc_sink=xmldoc_sink, htmlteixml_sink=htmlteixml_sink, - # don't SPNv2 for --bulk backfill - try_spn2=not args.bulk, + # don't SPNv2 for --bulk or --skip-spn + try_spn2=not (args.bulk or args.skip_spn), + spn_cdx_retry_sec=spn_cdx_retry_sec, ) pusher = KafkaJsonPusher( worker=worker, @@ -260,6 +285,7 @@ def run_ingest_file(args): ) pusher.run() + def run_persist_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) worker = PersistIngestFileResultWorker( @@ -275,96 +301,195 @@ def run_persist_ingest_file(args): ) pusher.run() + +def run_persist_crossref(args): + batch_size = 200 + if args.parse_refs: + batch_size = 10 + grobid_client = GrobidClient(host_url=args.grobid_host) + consume_topic = "fatcat-{}.api-crossref".format(args.env) + worker = PersistCrossrefWorker( + db_url=args.db_url, + grobid_client=grobid_client, + parse_refs=args.parse_refs, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-crossref", + push_batches=True, + # small batch size because doing GROBID processing + batch_size=batch_size, + ) + pusher.run() + + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--kafka-hosts', + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + "--kafka-hosts", default="localhost:9092", - help="list of Kafka brokers (host/port) to use") - parser.add_argument('--env', - default="dev", - help="Kafka topic namespace to use (eg, prod, qa, dev)") - parser.add_argument('--grobid-host', - default="http://grobid.qa.fatcat.wiki", - help="GROBID API host/port") - parser.add_argument('--db-url', + help="list of Kafka brokers (host/port) to use", + ) + parser.add_argument( + "--env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)" + ) + parser.add_argument( + "--kafka-group-suffix", default="", help="Kafka consumer group suffix (optional)" + ) + parser.add_argument( + "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port" + ) + parser.add_argument( + "--db-url", help="postgresql database connection string", - default="postgres:///sandcrawler") - parser.add_argument('--s3-url', - help="S3 (seaweedfs) backend URL", - default="localhost:9000") - parser.add_argument('--s3-access-key', + default="postgres:///sandcrawler", + ) + parser.add_argument("--s3-url", help="S3 (seaweedfs) backend URL", default="localhost:9000") + parser.add_argument( + "--s3-access-key", help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY')) - parser.add_argument('--s3-secret-key', + default=os.environ.get("SANDCRAWLER_BLOB_ACCESS_KEY") + or os.environ.get("MINIO_ACCESS_KEY"), + ) + parser.add_argument( + "--s3-secret-key", help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') or os.environ.get('MINIO_SECRET_KEY')) - parser.add_argument('--s3-bucket', - help="S3 (seaweedfs) bucket to persist into", - default="sandcrawler-dev") + default=os.environ.get("SANDCRAWLER_BLOB_SECRET_KEY") + or os.environ.get("MINIO_SECRET_KEY"), + ) + parser.add_argument( + "--s3-bucket", help="S3 (seaweedfs) bucket to persist into", default="sandcrawler-dev" + ) subparsers = parser.add_subparsers() - sub_grobid_extract = subparsers.add_parser('grobid-extract', - help="daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka") + sub_grobid_extract = subparsers.add_parser( + "grobid-extract", + help="daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka", + ) sub_grobid_extract.set_defaults(func=run_grobid_extract) - sub_pdf_extract = subparsers.add_parser('pdf-extract', - help="daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka") + sub_pdf_extract = subparsers.add_parser( + "pdf-extract", + help="daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka", + ) sub_pdf_extract.set_defaults(func=run_pdf_extract) - sub_persist_grobid = subparsers.add_parser('persist-grobid', - help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres") - sub_persist_grobid.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_persist_grobid.add_argument('--db-only', - action='store_true', - help="only write status to database (don't upload TEI-XML to S3)") + sub_persist_grobid = subparsers.add_parser( + "persist-grobid", + help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres", + ) + sub_persist_grobid.add_argument( + "--s3-only", + action="store_true", + help="only upload TEI-XML to S3 (don't write to database)", + ) + sub_persist_grobid.add_argument( + "--db-only", + action="store_true", + help="only write status to database (don't upload TEI-XML to S3)", + ) sub_persist_grobid.set_defaults(func=run_persist_grobid) - sub_persist_pdftext = subparsers.add_parser('persist-pdftext', - help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres") - sub_persist_pdftext.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_persist_pdftext.add_argument('--db-only', - action='store_true', - help="only write status to database (don't upload TEI-XML to S3)") + sub_persist_pdftext = subparsers.add_parser( + "persist-pdftext", + help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres", + ) + sub_persist_pdftext.add_argument( + "--s3-only", + action="store_true", + help="only upload TEI-XML to S3 (don't write to database)", + ) + sub_persist_pdftext.add_argument( + "--db-only", + action="store_true", + help="only write status to database (don't upload TEI-XML to S3)", + ) sub_persist_pdftext.set_defaults(func=run_persist_pdftext) - sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail', - help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres") + sub_persist_thumbnail = subparsers.add_parser( + "persist-thumbnail", + help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres", + ) sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) - sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc', - help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_xml_doc = subparsers.add_parser( + "persist-xml-doc", + help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket", + ) sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc) - sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml', - help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket") + sub_persist_html_teixml = subparsers.add_parser( + "persist-html-teixml", + help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket", + ) sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml) - sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', - help="daemon that consumes pdftrio output from Kafka and pushes to postgres") + sub_persist_pdftrio = subparsers.add_parser( + "persist-pdftrio", + help="daemon that consumes pdftrio output from Kafka and pushes to postgres", + ) sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) - sub_ingest_file = subparsers.add_parser('ingest-file', - help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") - sub_ingest_file.add_argument('--bulk', - action='store_true', - help="consume from bulk kafka topic (eg, for ingest backfill)") + sub_ingest_file = subparsers.add_parser( + "ingest-file", + help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka", + ) + sub_ingest_file.add_argument( + "--bulk", + action="store_true", + help="consume from bulk kafka topic (eg, for ingest backfill)", + ) + sub_ingest_file.add_argument( + "--skip-spn", + action="store_true", + help="don't do SPN lookups", + ) + sub_ingest_file.add_argument( + "--priority", + action="store_true", + help="consume from priority kafka topic (eg, for SPN requests)", + ) sub_ingest_file.set_defaults(func=run_ingest_file) - sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', - help="daemon that consumes ingest-file output from Kafka and pushes to postgres") + sub_persist_ingest_file = subparsers.add_parser( + "persist-ingest-file", + help="daemon that consumes ingest-file output from Kafka and pushes to postgres", + ) sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file) + sub_persist_crossref = subparsers.add_parser( + "persist-crossref", + help="daemon that persists crossref to postgres; also does GROBID ref transform", + ) + sub_persist_crossref.add_argument( + "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port" + ) + sub_persist_crossref.add_argument( + "--parse-refs", + action="store_true", + help="use GROBID to parse any unstructured references (default is to not)", + ) + sub_persist_crossref.set_defaults(func=run_persist_crossref) + args = parser.parse_args() if not args.__dict__.get("func"): parser.print_help(file=sys.stderr) sys.exit(-1) + # configure sentry *after* parsing args + try: + GIT_REVISION = ( + subprocess.check_output(["git", "describe", "--always"]).strip().decode("utf-8") + ) + except Exception: + print("failed to configure git revision", file=sys.stderr) + GIT_REVISION = None + sentry_sdk.init(release=GIT_REVISION, environment=args.env, max_breadcrumbs=10) + args.func(args) -if __name__ == '__main__': + +if __name__ == "__main__": main() |