#!/usr/bin/env python3 """ Tool for bulk uploading GROBID TEI-XML output from a local filesystem dump (from HBase) to AWS S3. See unpaywall delivery README (in bnewbold's scratch repo) for notes on running this script for that specific use-case. Script takes: - input TSV: `sha1_hex, json (including grobid0:tei_xml)` => usually from dumpgrobid, with SHA-1 key transformed to hex, and filtered down (eg, by join by SHA-1) to a specific manifest - AWS S3 bucket and prefix AWS S3 credentials are passed via environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) Output: - errors/stats to stderr - log to stdout (redirect to file), prefixed by sha1 Requires: - sentry-sdk - boto3 (AWS S3 client library) """ import argparse import base64 import hashlib import json import os import sys from collections import Counter import boto3 import sentry_sdk def b32_hex(s): """copy/pasta from elsewhere""" s = s.strip().split()[0].lower() if s.startswith("sha1:"): s = s[5:] if len(s) != 32: return s return base64.b16encode(base64.b32decode(s.upper())).lower().decode("utf-8") class DeliverDumpGrobidS3: def __init__(self, s3_bucket, **kwargs): self.rstore = None self.count = Counter() self.s3_bucket = s3_bucket self.s3_prefix = kwargs.get("s3_prefix", "grobid/") self.s3_suffix = kwargs.get("s3_suffix", ".tei.xml") self.s3_storage_class = kwargs.get("s3_storage_class", "STANDARD") self.s3 = boto3.resource("s3") self.bucket = self.s3.Bucket(self.s3_bucket) def run(self, dump_file): sys.stderr.write("Starting...\n") for line in dump_file: line = line.strip().split("\t") if len(line) != 2: self.count["skip-line"] += 1 continue sha1_hex, grobid_json = line[0], line[1] if len(sha1_hex) != 40: sha1_hex = b32_hex(sha1_hex) assert len(sha1_hex) == 40 grobid = json.loads(grobid_json) tei_xml = grobid.get("tei_xml") if not tei_xml: print("{}\tskip empty".format(sha1_hex)) self.count["skip-empty"] += 1 continue tei_xml = tei_xml.encode("utf-8") # upload to AWS S3 obj = self.bucket.put_object( Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4], sha1_hex, self.s3_suffix), Body=tei_xml, StorageClass=self.s3_storage_class, ) print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(tei_xml))) self.count["success-s3"] += 1 sys.stderr.write("{}\n".format(self.count)) def main(): parser = argparse.ArgumentParser() parser.add_argument( "--s3-bucket", required=True, type=str, help="AWS S3 bucket to upload into" ) parser.add_argument( "--s3-prefix", type=str, default="grobid/", help="key prefix for items created in bucket", ) parser.add_argument( "--s3-suffix", type=str, default=".tei.xml", help="file suffix for created objects" ) parser.add_argument( "--s3-storage-class", type=str, default="STANDARD", help="AWS S3 storage class (redundancy) to use", ) parser.add_argument( "dump_file", help="TSV/JSON dump file", default=sys.stdin, type=argparse.FileType("r") ) args = parser.parse_args() sentry_sdk.init() worker = DeliverDumpGrobidS3(**args.__dict__) worker.run(args.dump_file) if __name__ == "__main__": # pragma: no cover main()