diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/ingest_tool.py | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/python/ingest_tool.py b/python/ingest_tool.py index 7405d28..60a59d2 100755 --- a/python/ingest_tool.py +++ b/python/ingest_tool.py @@ -5,6 +5,7 @@ import json import sys from http.server import HTTPServer +from sandcrawler import GrobidClient, JsonLinePusher, KafkaCompressSink, KafkaSink from sandcrawler.ingest_file import IngestFileRequestHandler, IngestFileWorker from sandcrawler.ingest_fileset import IngestFilesetWorker @@ -55,6 +56,58 @@ def run_requests(args): print(json.dumps(result, sort_keys=True)) +def run_file_requests_backfill(args): + """ + Special mode for persisting GROBID and pdfextract results to Kafka, but + printing ingest result to stdout. + + Can be used to batch re-process known files. + """ + grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.kafka_env) + pdftext_topic = "sandcrawler-{}.pdf-text".format(args.kafka_env) + thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.kafka_env) + xmldoc_topic = "sandcrawler-{}.xml-doc".format(args.kafka_env) + htmlteixml_topic = "sandcrawler-{}.html-teixml".format(args.kafka_env) + grobid_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=grobid_topic, + ) + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) + pdftext_sink = KafkaCompressSink( + kafka_hosts=args.kafka_hosts, + produce_topic=pdftext_topic, + ) + thumbnail_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=thumbnail_topic, + ) + xmldoc_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=xmldoc_topic, + ) + htmlteixml_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=htmlteixml_topic, + ) + worker = IngestFileWorker( + grobid_client=grobid_client, + sink=None, + grobid_sink=grobid_sink, + thumbnail_sink=thumbnail_sink, + pdftext_sink=pdftext_sink, + xmldoc_sink=xmldoc_sink, + htmlteixml_sink=htmlteixml_sink, + try_spn2=False, + ) + pusher = JsonLinePusher( + worker, + args.json_file, + ) + pusher.run() + + def run_api(args): port = 8083 print("Listening on localhost:{}".format(port)) @@ -113,6 +166,29 @@ def main(): sub_api.set_defaults(func=run_api) sub_api.add_argument("--port", help="HTTP port to listen on", default=8033, type=int) + sub_file_requests_backfill = subparsers.add_parser( + "file-requests-backfill", + help="starts a simple HTTP server that processes ingest requests", + ) + sub_file_requests_backfill.set_defaults(func=run_file_requests_backfill) + sub_file_requests_backfill.add_argument( + "json_file", + help="JSON file (request per line) to import from (or stdin)", + default=sys.stdin, + type=argparse.FileType("r"), + ) + sub_file_requests_backfill.add_argument( + "--kafka-hosts", + default="localhost:9092", + help="list of Kafka brokers (host/port) to use", + ) + sub_file_requests_backfill.add_argument( + "--kafka-env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)" + ) + sub_file_requests_backfill.add_argument( + "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port" + ) + args = parser.parse_args() if not args.__dict__.get("func"): parser.print_help(file=sys.stderr) |