diff options
Diffstat (limited to 'python/persist_tool.py')
-rwxr-xr-x | python/persist_tool.py | 234 |
1 files changed, 167 insertions, 67 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py index 69e9374..e08d66c 100755 --- a/python/persist_tool.py +++ b/python/persist_tool.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - """ Commands for backfilling content from bulk files into postgresql and s3 (seaweedfs). @@ -7,9 +6,9 @@ Normally this is done by workers (in sandcrawler_worker.py) consuming from Kafka feeds, but sometimes we have bulk processing output we want to backfill. """ +import argparse import os import sys -import argparse from sandcrawler import * from sandcrawler.persist import * @@ -19,7 +18,7 @@ def run_cdx(args): worker = PersistCdxWorker( db_url=args.db_url, ) - filter_mimetypes = ['application/pdf'] + filter_mimetypes = ["application/pdf"] if args.no_mimetype_filter: filter_mimetypes = None pusher = CdxLinePusher( @@ -27,11 +26,12 @@ def run_cdx(args): args.cdx_file, filter_http_statuses=[200, 226], filter_mimetypes=filter_mimetypes, - #allow_octet_stream + # allow_octet_stream batch_size=200, ) pusher.run() + def run_grobid(args): worker = PersistGrobidWorker( db_url=args.db_url, @@ -49,6 +49,7 @@ def run_grobid(args): ) pusher.run() + def run_grobid_disk(args): """ Writes XML to individual files on disk, and also prints non-XML metadata to @@ -63,6 +64,7 @@ def run_grobid_disk(args): ) pusher.run() + def run_pdftrio(args): worker = PersistPdfTrioWorker( db_url=args.db_url, @@ -74,6 +76,7 @@ def run_pdftrio(args): ) pusher.run() + def run_pdftext(args): worker = PersistPdfTextWorker( db_url=args.db_url, @@ -91,6 +94,7 @@ def run_pdftext(args): ) pusher.run() + def run_ingest_file_result(args): worker = PersistIngestFileResultWorker( db_url=args.db_url, @@ -102,6 +106,7 @@ def run_ingest_file_result(args): ) pusher.run() + def run_ingest_request(args): worker = PersistIngestRequestWorker( db_url=args.db_url, @@ -113,92 +118,186 @@ def run_ingest_request(args): ) pusher.run() + +def run_crossref(args): + batch_size = 200 + if args.parse_refs: + batch_size = 10 + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) + worker = PersistCrossrefWorker( + db_url=args.db_url, + grobid_client=grobid_client, + parse_refs=args.parse_refs, + ) + pusher = JsonLinePusher( + worker, + args.json_file, + batch_size=batch_size, + ) + pusher.run() + + +def run_grobid_refs(args): + worker = PersistGrobidRefsWorker( + db_url=args.db_url, + ) + pusher = JsonLinePusher( + worker, + args.json_file, + batch_size=100, + ) + pusher.run() + + def main(): - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--db-url', + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + "--db-url", help="postgresql database connection string", - default="postgres:///sandcrawler") - parser.add_argument('--s3-url', - help="S3 (seaweedfs) backend URL", - default="localhost:9000") - parser.add_argument('--s3-access-key', + default="postgres:///sandcrawler", + ) + parser.add_argument("--s3-url", help="S3 (seaweedfs) backend URL", default="localhost:9000") + parser.add_argument( + "--s3-access-key", help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_ACCESS_KEY')) - parser.add_argument('--s3-secret-key', + default=os.environ.get("SANDCRAWLER_BLOB_ACCESS_KEY") + or os.environ.get("MINIO_ACCESS_KEY"), + ) + parser.add_argument( + "--s3-secret-key", help="S3 (seaweedfs) credential", - default=os.environ.get('SANDCRAWLER_BLOB_ACCESS_KEY') or os.environ.get('MINIO_SECRET_KEY')) - parser.add_argument('--s3-bucket', - help="S3 (seaweedfs) bucket to persist into", - default="sandcrawler-dev") + default=os.environ.get("SANDCRAWLER_BLOB_ACCESS_KEY") + or os.environ.get("MINIO_SECRET_KEY"), + ) + parser.add_argument( + "--s3-bucket", help="S3 (seaweedfs) bucket to persist into", default="sandcrawler-dev" + ) subparsers = parser.add_subparsers() - sub_cdx = subparsers.add_parser('cdx', - help="backfill a CDX file into postgresql cdx table") + sub_cdx = subparsers.add_parser("cdx", help="backfill a CDX file into postgresql cdx table") sub_cdx.set_defaults(func=run_cdx) - sub_cdx.add_argument('cdx_file', + sub_cdx.add_argument( + "cdx_file", help="CDX file to import from (or '-' for stdin)", - type=argparse.FileType('r')) - sub_cdx.add_argument('--no-mimetype-filter', - action='store_true', - help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)") + type=argparse.FileType("r"), + ) + sub_cdx.add_argument( + "--no-mimetype-filter", + action="store_true", + help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)", + ) - sub_grobid = subparsers.add_parser('grobid', - help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)") + sub_grobid = subparsers.add_parser( + "grobid", help="backfill a grobid JSON ('pg') dump into postgresql and s3 (seaweedfs)" + ) sub_grobid.set_defaults(func=run_grobid) - sub_grobid.add_argument('json_file', + sub_grobid.add_argument( + "json_file", help="grobid file to import from (or '-' for stdin)", - type=argparse.FileType('r')) - sub_grobid.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_grobid.add_argument('--db-only', - action='store_true', - help="only write status to sandcrawler-db (don't save TEI-XML to S3)") - - sub_pdftext = subparsers.add_parser('pdftext', - help="backfill a pdftext JSON ('pg') dump into postgresql and s3 (seaweedfs)") + type=argparse.FileType("r"), + ) + sub_grobid.add_argument( + "--s3-only", + action="store_true", + help="only upload TEI-XML to S3 (don't write to database)", + ) + sub_grobid.add_argument( + "--db-only", + action="store_true", + help="only write status to sandcrawler-db (don't save TEI-XML to S3)", + ) + + sub_pdftext = subparsers.add_parser( + "pdftext", help="backfill a pdftext JSON ('pg') dump into postgresql and s3 (seaweedfs)" + ) sub_pdftext.set_defaults(func=run_pdftext) - sub_pdftext.add_argument('json_file', + sub_pdftext.add_argument( + "json_file", help="pdftext file to import from (or '-' for stdin)", - type=argparse.FileType('r')) - sub_pdftext.add_argument('--s3-only', - action='store_true', - help="only upload TEI-XML to S3 (don't write to database)") - sub_pdftext.add_argument('--db-only', - action='store_true', - help="only write status to sandcrawler-db (don't save TEI-XML to S3)") - - sub_grobid_disk = subparsers.add_parser('grobid-disk', - help="dump GRBOID output to (local) files on disk") + type=argparse.FileType("r"), + ) + sub_pdftext.add_argument( + "--s3-only", + action="store_true", + help="only upload TEI-XML to S3 (don't write to database)", + ) + sub_pdftext.add_argument( + "--db-only", + action="store_true", + help="only write status to sandcrawler-db (don't save TEI-XML to S3)", + ) + + sub_grobid_disk = subparsers.add_parser( + "grobid-disk", help="dump GRBOID output to (local) files on disk" + ) sub_grobid_disk.set_defaults(func=run_grobid_disk) - sub_grobid_disk.add_argument('json_file', + sub_grobid_disk.add_argument( + "json_file", help="grobid file to import from (or '-' for stdin)", - type=argparse.FileType('r')) - sub_grobid_disk.add_argument('output_dir', - help="base directory to output into", - type=str) + type=argparse.FileType("r"), + ) + sub_grobid_disk.add_argument("output_dir", help="base directory to output into", type=str) - sub_pdftrio = subparsers.add_parser('pdftrio', - help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (seaweedfs)") + sub_pdftrio = subparsers.add_parser( + "pdftrio", help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (seaweedfs)" + ) sub_pdftrio.set_defaults(func=run_pdftrio) - sub_pdftrio.add_argument('json_file', + sub_pdftrio.add_argument( + "json_file", help="pdftrio file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + type=argparse.FileType("r"), + ) - sub_ingest_file_result = subparsers.add_parser('ingest-file-result', - help="backfill a ingest_file_result JSON dump into postgresql") + sub_ingest_file_result = subparsers.add_parser( + "ingest-file-result", help="backfill a ingest_file_result JSON dump into postgresql" + ) sub_ingest_file_result.set_defaults(func=run_ingest_file_result) - sub_ingest_file_result.add_argument('json_file', + sub_ingest_file_result.add_argument( + "json_file", help="ingest_file_result file to import from (or '-' for stdin)", - type=argparse.FileType('r')) + type=argparse.FileType("r"), + ) - sub_ingest_request = subparsers.add_parser('ingest-request', - help="backfill a ingest_request JSON dump into postgresql") + sub_ingest_request = subparsers.add_parser( + "ingest-request", help="backfill a ingest_request JSON dump into postgresql" + ) sub_ingest_request.set_defaults(func=run_ingest_request) - sub_ingest_request.add_argument('json_file', + sub_ingest_request.add_argument( + "json_file", help="ingest_request to import from (or '-' for stdin)", - type=argparse.FileType('r')) + type=argparse.FileType("r"), + ) + + sub_crossref = subparsers.add_parser( + "crossref", + help="backfill a crossref JSON dump into postgresql, and extract references at the same time", + ) + sub_crossref.set_defaults(func=run_crossref) + sub_crossref.add_argument( + "json_file", + help="crossref file to import from (or '-' for stdin)", + type=argparse.FileType("r"), + ) + sub_crossref.add_argument( + "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port" + ) + sub_crossref.add_argument( + "--parse-refs", + action="store_true", + help="use GROBID to parse any unstructured references (default is to not)", + ) + + sub_grobid_refs = subparsers.add_parser( + "grobid-refs", help="backfill a grobid_refs JSON dump into postgresql" + ) + sub_grobid_refs.set_defaults(func=run_grobid_refs) + sub_grobid_refs.add_argument( + "json_file", + help="grobid_refs to import from (or '-' for stdin)", + type=argparse.FileType("r"), + ) args = parser.parse_args() if not args.__dict__.get("func"): @@ -207,5 +306,6 @@ def main(): args.func(args) -if __name__ == '__main__': + +if __name__ == "__main__": main() |