From 6a5a0b090d7f303f3332759d63ffd0ac77cdd28c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Dec 2019 21:17:27 -0800 Subject: add PersistGrobidDiskWorker To help with making dumps directly from Kafka (eg, for partner delivery) --- python/persist_tool.py | 27 +++++++++++++++++++++++++++ python/sandcrawler/persist.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/python/persist_tool.py b/python/persist_tool.py index 309601b..29345e2 100755 --- a/python/persist_tool.py +++ b/python/persist_tool.py @@ -50,6 +50,20 @@ def run_grobid(args): ) pusher.run() +def run_grobid_disk(args): + """ + Writes XML to individual files on disk, and also prints non-XML metadata to + stdout as JSON, which can be redirected to a separate file. + """ + worker = PersistGrobidDiskWorker( + output_dir=args.output_dir, + ) + pusher = JsonLinePusher( + worker, + args.json_file, + ) + pusher.run() + def run_ingest_file_result(args): worker = PersistIngestFileResultWorker( db_url=args.db_url, @@ -97,6 +111,19 @@ def main(): sub_grobid.add_argument('json_file', help="grobid file to import from (or '-' for stdin)", type=argparse.FileType('r')) + sub_grobid.add_argument('--s3-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + + sub_grobid_disk = subparsers.add_parser('grobid-disk', + help="dump GRBOID output to (local) files on disk") + sub_grobid_disk.set_defaults(func=run_grobid_disk) + sub_grobid_disk.add_argument('json_file', + help="grobid file to import from (or '-' for stdin)", + type=argparse.FileType('r')) + sub_grobid_disk.add_argument('output_dir', + help="base directory to output into", + type=str) sub_ingest_file_result = subparsers.add_parser('ingest-file-result', help="backfill a ingest_file_result JSON dump into postgresql") diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index b017a82..9f8171c 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -245,3 +245,36 @@ class PersistGrobidWorker(SandcrawlerWorker): self.db.commit() return [] + +class PersistGrobidDiskWorker(SandcrawlerWorker): + """ + Writes blobs out to disk. + + This could be refactored into a "Sink" type with an even thinner wrapper. + """ + + def __init__(self, output_dir): + super().__init__() + self.output_dir = output_dir + + def _blob_path(self, sha1hex, extension=".tei.xml"): + obj_path = "{}/{}/{}{}".format( + sha1hex[0:2], + sha1hex[2:4], + sha1hex, + extension, + ) + return obj_path + + def process(self, record): + + if record['status_code'] != 200 or not record.get('tei_xml'): + return False + assert(len(record['key'])) == 40 + p = "{}/{}".format(self.output_dir, self._blob_path(record['key'])) + os.makedirs(os.path.dirname(p), exist_ok=True) + with open(p, 'w') as f: + f.write(record.pop('tei_xml')) + self.counts['written'] += 1 + return record + -- cgit v1.2.3