aboutsummaryrefslogtreecommitdiffstats
path: root/python/persist_tool.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/persist_tool.py')
-rwxr-xr-xpython/persist_tool.py311
1 files changed, 311 insertions, 0 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py
new file mode 100755
index 0000000..e08d66c
--- /dev/null
+++ b/python/persist_tool.py
@@ -0,0 +1,311 @@
+#!/usr/bin/env python3
+"""
+Commands for backfilling content from bulk files into postgresql and s3 (seaweedfs).
+
+Normally this is done by workers (in sandcrawler_worker.py) consuming from
+Kafka feeds, but sometimes we have bulk processing output we want to backfill.
+"""
+
+import argparse
+import os
+import sys
+
+from sandcrawler import *
+from sandcrawler.persist import *
+
+
+def run_cdx(args):
+ worker = PersistCdxWorker(
+ db_url=args.db_url,
+ )
+ filter_mimetypes = ["application/pdf"]
+ if args.no_mimetype_filter:
+ filter_mimetypes = None
+ pusher = CdxLinePusher(
+ worker,
+ args.cdx_file,
+ filter_http_statuses=[200, 226],
+ filter_mimetypes=filter_mimetypes,
+ # allow_octet_stream
+ batch_size=200,
+ )
+ pusher.run()
+
+
+def run_grobid(args):
+ worker = PersistGrobidWorker(
+ db_url=args.db_url,
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ s3_only=args.s3_only,
+ db_only=args.db_only,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=50,
+ )
+ pusher.run()
+
+
+def run_grobid_disk(args):
+ """
+ Writes XML to individual files on disk, and also prints non-XML metadata to
+ stdout as JSON, which can be redirected to a separate file.
+ """
+ worker = PersistGrobidDiskWorker(
+ output_dir=args.output_dir,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ )
+ pusher.run()
+
+
+def run_pdftrio(args):
+ worker = PersistPdfTrioWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=100,
+ )
+ pusher.run()
+
+
+def run_pdftext(args):
+ worker = PersistPdfTextWorker(
+ db_url=args.db_url,
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ s3_only=args.s3_only,
+ db_only=args.db_only,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=50,
+ )
+ pusher.run()
+
+
+def run_ingest_file_result(args):
+ worker = PersistIngestFileResultWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=200,
+ )
+ pusher.run()
+
+
+def run_ingest_request(args):
+ worker = PersistIngestRequestWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=200,
+ )
+ pusher.run()
+
+
+def run_crossref(args):
+ batch_size = 200
+ if args.parse_refs:
+ batch_size = 10
+ grobid_client = GrobidClient(
+ host_url=args.grobid_host,
+ )
+ worker = PersistCrossrefWorker(
+ db_url=args.db_url,
+ grobid_client=grobid_client,
+ parse_refs=args.parse_refs,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=batch_size,
+ )
+ pusher.run()
+
+
+def run_grobid_refs(args):
+ worker = PersistGrobidRefsWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=100,
+ )
+ pusher.run()
+
+
+def main():
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument(
+ "--db-url",
+ help="postgresql database connection string",
+ default="postgres:///sandcrawler",
+ )
+ parser.add_argument("--s3-url", help="S3 (seaweedfs) backend URL", default="localhost:9000")
+ parser.add_argument(
+ "--s3-access-key",
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get("SANDCRAWLER_BLOB_ACCESS_KEY")
+ or os.environ.get("MINIO_ACCESS_KEY"),
+ )
+ parser.add_argument(
+ "--s3-secret-key",
+ help="S3 (seaweedfs) credential",
+ default=os.environ.get("SANDCRAWLER_BLOB_ACCESS_KEY")
+ or os.environ.get("MINIO_SECRET_KEY"),
+ )
+ parser.add_argument(
+ "--s3-bucket", help="S3 (seaweedfs) bucket to persist into", default="sandcrawler-dev"
+ )
+ subparsers = parser.add_subparsers()
+
+ sub_cdx = subparsers.add_parser("cdx", help="backfill a CDX file into postgresql cdx table")
+ sub_cdx.set_defaults(func=run_cdx)
+ sub_cdx.add_argument(
+ "cdx_file",
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+ sub_cdx.add_argument(
+ "--no-mimetype-filter",
+ action="store_true",
+ help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)",
+ )
+
+ sub_grobid = subparsers.add_parser(
+ "grobid", help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)"
+ )
+ sub_grobid.set_defaults(func=run_grobid)
+ sub_grobid.add_argument(
+ "json_file",
+ help="grobid file to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+ sub_grobid.add_argument(
+ "--s3-only",
+ action="store_true",
+ help="only upload TEI-XML to S3 (don't write to database)",
+ )
+ sub_grobid.add_argument(
+ "--db-only",
+ action="store_true",
+ help="only write status to sandcrawler-db (don't save TEI-XML to S3)",
+ )
+
+ sub_pdftext = subparsers.add_parser(
+ "pdftext", help="backfill a pdftext JSON ('pg') dump into postgresql and s3 (seaweedfs)"
+ )
+ sub_pdftext.set_defaults(func=run_pdftext)
+ sub_pdftext.add_argument(
+ "json_file",
+ help="pdftext file to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+ sub_pdftext.add_argument(
+ "--s3-only",
+ action="store_true",
+ help="only upload TEI-XML to S3 (don't write to database)",
+ )
+ sub_pdftext.add_argument(
+ "--db-only",
+ action="store_true",
+ help="only write status to sandcrawler-db (don't save TEI-XML to S3)",
+ )
+
+ sub_grobid_disk = subparsers.add_parser(
+ "grobid-disk", help="dump GRBOID output to (local) files on disk"
+ )
+ sub_grobid_disk.set_defaults(func=run_grobid_disk)
+ sub_grobid_disk.add_argument(
+ "json_file",
+ help="grobid file to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+ sub_grobid_disk.add_argument("output_dir", help="base directory to output into", type=str)
+
+ sub_pdftrio = subparsers.add_parser(
+ "pdftrio", help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (seaweedfs)"
+ )
+ sub_pdftrio.set_defaults(func=run_pdftrio)
+ sub_pdftrio.add_argument(
+ "json_file",
+ help="pdftrio file to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+
+ sub_ingest_file_result = subparsers.add_parser(
+ "ingest-file-result", help="backfill a ingest_file_result JSON dump into postgresql"
+ )
+ sub_ingest_file_result.set_defaults(func=run_ingest_file_result)
+ sub_ingest_file_result.add_argument(
+ "json_file",
+ help="ingest_file_result file to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+
+ sub_ingest_request = subparsers.add_parser(
+ "ingest-request", help="backfill a ingest_request JSON dump into postgresql"
+ )
+ sub_ingest_request.set_defaults(func=run_ingest_request)
+ sub_ingest_request.add_argument(
+ "json_file",
+ help="ingest_request to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+
+ sub_crossref = subparsers.add_parser(
+ "crossref",
+ help="backfill a crossref JSON dump into postgresql, and extract references at the same time",
+ )
+ sub_crossref.set_defaults(func=run_crossref)
+ sub_crossref.add_argument(
+ "json_file",
+ help="crossref file to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+ sub_crossref.add_argument(
+ "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port"
+ )
+ sub_crossref.add_argument(
+ "--parse-refs",
+ action="store_true",
+ help="use GROBID to parse any unstructured references (default is to not)",
+ )
+
+ sub_grobid_refs = subparsers.add_parser(
+ "grobid-refs", help="backfill a grobid_refs JSON dump into postgresql"
+ )
+ sub_grobid_refs.set_defaults(func=run_grobid_refs)
+ sub_grobid_refs.add_argument(
+ "json_file",
+ help="grobid_refs to import from (or '-' for stdin)",
+ type=argparse.FileType("r"),
+ )
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("Tell me what to do!", file=sys.stderr)
+ sys.exit(-1)
+
+ args.func(args)
+
+
+if __name__ == "__main__":
+ main()