aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-xpython/sandcrawler_worker.py151
1 files changed, 87 insertions, 64 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index e185fad..3c76c17 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
-
"""
These are generally for continuously running workers that consume from Kafka.
Outputs might either be pushed back into Kafka, or directly into sandcrawler-db
@@ -31,12 +30,8 @@ def run_grobid_extract(args):
kafka_hosts=args.kafka_hosts,
produce_topic=produce_topic,
)
- grobid_client = GrobidClient(
- host_url=args.grobid_host,
- )
- wayback_client = WaybackClient(
- host_url=args.grobid_host,
- )
+ grobid_client = GrobidClient(host_url=args.grobid_host, )
+ wayback_client = WaybackClient(host_url=args.grobid_host, )
worker = GrobidWorker(
grobid_client=grobid_client,
wayback_client=wayback_client,
@@ -51,6 +46,7 @@ def run_grobid_extract(args):
)
pusher.run()
+
def run_pdf_extract(args):
consume_topic = "sandcrawler-{}.unextracted".format(args.env)
pdftext_topic = "sandcrawler-{}.pdf-text".format(args.env)
@@ -63,9 +59,7 @@ def run_pdf_extract(args):
kafka_hosts=args.kafka_hosts,
produce_topic=thumbnail_topic,
)
- wayback_client = WaybackClient(
- host_url=args.grobid_host,
- )
+ wayback_client = WaybackClient(host_url=args.grobid_host, )
worker = PdfExtractWorker(
wayback_client=wayback_client,
sink=pdftext_sink,
@@ -81,6 +75,7 @@ def run_pdf_extract(args):
)
pusher.run()
+
def run_persist_grobid(args):
consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
worker = PersistGrobidWorker(
@@ -105,6 +100,7 @@ def run_persist_grobid(args):
)
pusher.run()
+
def run_persist_pdftext(args):
consume_topic = "sandcrawler-{}.pdf-text".format(args.env)
worker = PersistPdfTextWorker(
@@ -129,6 +125,7 @@ def run_persist_pdftext(args):
)
pusher.run()
+
def run_persist_thumbnail(args):
consume_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.env)
worker = PersistThumbnailWorker(
@@ -150,6 +147,7 @@ def run_persist_thumbnail(args):
)
pusher.run()
+
def run_persist_xml_doc(args: argparse.Namespace) -> None:
consume_topic = f"sandcrawler-{args.env}.xml-doc"
worker = PersistXmlDocWorker(
@@ -168,6 +166,7 @@ def run_persist_xml_doc(args: argparse.Namespace) -> None:
)
pusher.run()
+
def run_persist_html_teixml(args: argparse.Namespace) -> None:
consume_topic = f"sandcrawler-{args.env}.html-teixml"
worker = PersistHtmlTeiXmlWorker(
@@ -186,11 +185,10 @@ def run_persist_html_teixml(args: argparse.Namespace) -> None:
)
pusher.run()
+
def run_persist_pdftrio(args):
consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env)
- worker = PersistPdfTrioWorker(
- db_url=args.db_url,
- )
+ worker = PersistPdfTrioWorker(db_url=args.db_url, )
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
@@ -201,6 +199,7 @@ def run_persist_pdftrio(args):
)
pusher.run()
+
def run_ingest_file(args):
spn_cdx_retry_sec = 9.0
if args.bulk:
@@ -228,9 +227,7 @@ def run_ingest_file(args):
kafka_hosts=args.kafka_hosts,
produce_topic=grobid_topic,
)
- grobid_client = GrobidClient(
- host_url=args.grobid_host,
- )
+ grobid_client = GrobidClient(host_url=args.grobid_host, )
pdftext_sink = KafkaCompressSink(
kafka_hosts=args.kafka_hosts,
produce_topic=pdftext_topic,
@@ -268,11 +265,10 @@ def run_ingest_file(args):
)
pusher.run()
+
def run_persist_ingest_file(args):
consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
- worker = PersistIngestFileResultWorker(
- db_url=args.db_url,
- )
+ worker = PersistIngestFileResultWorker(db_url=args.db_url, )
pusher = KafkaJsonPusher(
worker=worker,
kafka_hosts=args.kafka_hosts,
@@ -283,90 +279,116 @@ def run_persist_ingest_file(args):
)
pusher.run()
+
def main():
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--kafka-hosts',
- default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
parser.add_argument('--env',
- default="dev",
- help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
parser.add_argument('--grobid-host',
- default="http://grobid.qa.fatcat.wiki",
- help="GROBID API host/port")
+ default="http://grobid.qa.fatcat.wiki",
+ help="GROBID API host/port")
parser.add_argument('--db-url',
- help="postgresql database connection string",
- default="postgres:///sandcrawler")
- parser.add_argument('--s3-url',
- help="S3 (seaweedfs) backend URL",
- default="localhost:9000")
+ help="postgresql database connection string",
+ default="postgres:///sandcrawler")
+ parser.add_argument('--s3-url', help="S3 (seaweedfs) backend URL", default="localhost:9000")
parser.add_argument('--s3-access-key',
- help="S3 (seaweedfs) credential",
- default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY'))
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY')
+ or os.environ.get('MINIO_ACCESS_KEY'))
parser.add_argument('--s3-secret-key',
- help="S3 (seaweedfs) credential",
- default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY') or os.environ.get('MINIO_SECRET_KEY'))
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get('SANDCRAWLER_BLOB_SECRET_KEY')
+ or os.environ.get('MINIO_SECRET_KEY'))
parser.add_argument('--s3-bucket',
- help="S3 (seaweedfs) bucket to persist into",
- default="sandcrawler-dev")
+ help="S3 (seaweedfs) bucket to persist into",
+ default="sandcrawler-dev")
subparsers = parser.add_subparsers()
- sub_grobid_extract = subparsers.add_parser('grobid-extract',
- help="daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka")
+ sub_grobid_extract = subparsers.add_parser(
+ 'grobid-extract',
+ help=
+ "daemon that consumes CDX JSON objects from Kafka, uses GROBID to extract XML, pushes to Kafka"
+ )
sub_grobid_extract.set_defaults(func=run_grobid_extract)
- sub_pdf_extract = subparsers.add_parser('pdf-extract',
- help="daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka")
+ sub_pdf_extract = subparsers.add_parser(
+ 'pdf-extract',
+ help=
+ "daemon that consumes CDX JSON objects from Kafka, extracts text and thumbnail, pushes to Kafka"
+ )
sub_pdf_extract.set_defaults(func=run_pdf_extract)
- sub_persist_grobid = subparsers.add_parser('persist-grobid',
- help="daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres")
+ sub_persist_grobid = subparsers.add_parser(
+ 'persist-grobid',
+ help=
+ "daemon that consumes GROBID output from Kafka and pushes to S3 (seaweedfs) and postgres"
+ )
sub_persist_grobid.add_argument('--s3-only',
- action='store_true',
- help="only upload TEI-XML to S3 (don't write to database)")
- sub_persist_grobid.add_argument('--db-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_persist_grobid.add_argument(
+ '--db-only',
action='store_true',
help="only write status to database (don't upload TEI-XML to S3)")
sub_persist_grobid.set_defaults(func=run_persist_grobid)
- sub_persist_pdftext = subparsers.add_parser('persist-pdftext',
- help="daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres")
+ sub_persist_pdftext = subparsers.add_parser(
+ 'persist-pdftext',
+ help=
+ "daemon that consumes pdftext output from Kafka and pushes to S3 (seaweedfs) and postgres"
+ )
sub_persist_pdftext.add_argument('--s3-only',
- action='store_true',
- help="only upload TEI-XML to S3 (don't write to database)")
- sub_persist_pdftext.add_argument('--db-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_persist_pdftext.add_argument(
+ '--db-only',
action='store_true',
help="only write status to database (don't upload TEI-XML to S3)")
sub_persist_pdftext.set_defaults(func=run_persist_pdftext)
- sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail',
- help="daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres")
+ sub_persist_thumbnail = subparsers.add_parser(
+ 'persist-thumbnail',
+ help=
+ "daemon that consumes thumbnail output from Kafka and pushes to S3 (seaweedfs) and postgres"
+ )
sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail)
- sub_persist_xml_doc = subparsers.add_parser('persist-xml-doc',
- help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket")
+ sub_persist_xml_doc = subparsers.add_parser(
+ 'persist-xml-doc',
+ help="daemon that consumes xml-doc output from Kafka and pushes to S3 (seaweedfs) bucket"
+ )
sub_persist_xml_doc.set_defaults(func=run_persist_xml_doc)
- sub_persist_html_teixml = subparsers.add_parser('persist-html-teixml',
- help="daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket")
+ sub_persist_html_teixml = subparsers.add_parser(
+ 'persist-html-teixml',
+ help=
+ "daemon that consumes html-teixml output from Kafka and pushes to S3 (seaweedfs) bucket"
+ )
sub_persist_html_teixml.set_defaults(func=run_persist_html_teixml)
- sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio',
+ sub_persist_pdftrio = subparsers.add_parser(
+ 'persist-pdftrio',
help="daemon that consumes pdftrio output from Kafka and pushes to postgres")
sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio)
- sub_ingest_file = subparsers.add_parser('ingest-file',
+ sub_ingest_file = subparsers.add_parser(
+ 'ingest-file',
help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka")
sub_ingest_file.add_argument('--bulk',
- action='store_true',
- help="consume from bulk kafka topic (eg, for ingest backfill)")
- sub_ingest_file.add_argument('--priority',
+ action='store_true',
+ help="consume from bulk kafka topic (eg, for ingest backfill)")
+ sub_ingest_file.add_argument(
+ '--priority',
action='store_true',
help="consume from priority kafka topic (eg, for SPN requests)")
sub_ingest_file.set_defaults(func=run_ingest_file)
- sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file',
+ sub_persist_ingest_file = subparsers.add_parser(
+ 'persist-ingest-file',
help="daemon that consumes ingest-file output from Kafka and pushes to postgres")
sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file)
@@ -377,5 +399,6 @@ def main():
args.func(args)
+
if __name__ == '__main__':
main()