aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/ingest_tool.py76
1 files changed, 76 insertions, 0 deletions
diff --git a/python/ingest_tool.py b/python/ingest_tool.py
index 7405d28..60a59d2 100755
--- a/python/ingest_tool.py
+++ b/python/ingest_tool.py
@@ -5,6 +5,7 @@ import json
import sys
from http.server import HTTPServer
+from sandcrawler import GrobidClient, JsonLinePusher, KafkaCompressSink, KafkaSink
from sandcrawler.ingest_file import IngestFileRequestHandler, IngestFileWorker
from sandcrawler.ingest_fileset import IngestFilesetWorker
@@ -55,6 +56,58 @@ def run_requests(args):
print(json.dumps(result, sort_keys=True))
+def run_file_requests_backfill(args):
+ """
+ Special mode for persisting GROBID and pdfextract results to Kafka, but
+ printing ingest result to stdout.
+
+ Can be used to batch re-process known files.
+ """
+ grobid_topic = "sandcrawler-{}.grobid-output-pg".format(args.kafka_env)
+ pdftext_topic = "sandcrawler-{}.pdf-text".format(args.kafka_env)
+ thumbnail_topic = "sandcrawler-{}.pdf-thumbnail-180px-jpg".format(args.kafka_env)
+ xmldoc_topic = "sandcrawler-{}.xml-doc".format(args.kafka_env)
+ htmlteixml_topic = "sandcrawler-{}.html-teixml".format(args.kafka_env)
+ grobid_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=grobid_topic,
+ )
+ grobid_client = GrobidClient(
+ host_url=args.grobid_host,
+ )
+ pdftext_sink = KafkaCompressSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=pdftext_topic,
+ )
+ thumbnail_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=thumbnail_topic,
+ )
+ xmldoc_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=xmldoc_topic,
+ )
+ htmlteixml_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=htmlteixml_topic,
+ )
+ worker = IngestFileWorker(
+ grobid_client=grobid_client,
+ sink=None,
+ grobid_sink=grobid_sink,
+ thumbnail_sink=thumbnail_sink,
+ pdftext_sink=pdftext_sink,
+ xmldoc_sink=xmldoc_sink,
+ htmlteixml_sink=htmlteixml_sink,
+ try_spn2=False,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ )
+ pusher.run()
+
+
def run_api(args):
port = 8083
print("Listening on localhost:{}".format(port))
@@ -113,6 +166,29 @@ def main():
sub_api.set_defaults(func=run_api)
sub_api.add_argument("--port", help="HTTP port to listen on", default=8033, type=int)
+ sub_file_requests_backfill = subparsers.add_parser(
+ "file-requests-backfill",
+ help="starts a simple HTTP server that processes ingest requests",
+ )
+ sub_file_requests_backfill.set_defaults(func=run_file_requests_backfill)
+ sub_file_requests_backfill.add_argument(
+ "json_file",
+ help="JSON file (request per line) to import from (or stdin)",
+ default=sys.stdin,
+ type=argparse.FileType("r"),
+ )
+ sub_file_requests_backfill.add_argument(
+ "--kafka-hosts",
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use",
+ )
+ sub_file_requests_backfill.add_argument(
+ "--kafka-env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)"
+ )
+ sub_file_requests_backfill.add_argument(
+ "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port"
+ )
+
args = parser.parse_args()
if not args.__dict__.get("func"):
parser.print_help(file=sys.stderr)