aboutsummaryrefslogtreecommitdiffstats
path: root/python/scripts/deliver_dumpgrobid_to_s3.py
blob: 27ccf21c77ac0dedbc2df7f295babd698fcad766 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
"""
Tool for bulk uploading GROBID TEI-XML output from a local filesystem dump
(from HBase) to AWS S3.

See unpaywall delivery README (in bnewbold's scratch repo) for notes on running
this script for that specific use-case.

Script takes:
- input TSV: `sha1_hex, json (including grobid0:tei_xml)`
    => usually from dumpgrobid, with SHA-1 key transformed to hex, and filtered
       down (eg, by join by SHA-1) to a specific manifest
- AWS S3 bucket and prefix

AWS S3 credentials are passed via environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

Output:
- errors/stats to stderr
- log to stdout (redirect to file), prefixed by sha1

Requires:
- sentry-sdk
- boto3 (AWS S3 client library)
"""

import argparse
import base64
import hashlib
import json
import os
import sys
from collections import Counter

import boto3
import sentry_sdk


def b32_hex(s):
    """copy/pasta from elsewhere"""
    s = s.strip().split()[0].lower()
    if s.startswith("sha1:"):
        s = s[5:]
    if len(s) != 32:
        return s
    return base64.b16encode(base64.b32decode(s.upper())).lower().decode("utf-8")


class DeliverDumpGrobidS3:
    def __init__(self, s3_bucket, **kwargs):
        self.rstore = None
        self.count = Counter()
        self.s3_bucket = s3_bucket
        self.s3_prefix = kwargs.get("s3_prefix", "grobid/")
        self.s3_suffix = kwargs.get("s3_suffix", ".tei.xml")
        self.s3_storage_class = kwargs.get("s3_storage_class", "STANDARD")
        self.s3 = boto3.resource("s3")
        self.bucket = self.s3.Bucket(self.s3_bucket)

    def run(self, dump_file):
        sys.stderr.write("Starting...\n")
        for line in dump_file:
            line = line.strip().split("\t")
            if len(line) != 2:
                self.count["skip-line"] += 1
                continue
            sha1_hex, grobid_json = line[0], line[1]
            if len(sha1_hex) != 40:
                sha1_hex = b32_hex(sha1_hex)
            assert len(sha1_hex) == 40
            grobid = json.loads(grobid_json)
            tei_xml = grobid.get("tei_xml")
            if not tei_xml:
                print("{}\tskip empty".format(sha1_hex))
                self.count["skip-empty"] += 1
                continue
            tei_xml = tei_xml.encode("utf-8")
            # upload to AWS S3
            obj = self.bucket.put_object(
                Key="{}{}/{}{}".format(self.s3_prefix, sha1_hex[0:4], sha1_hex, self.s3_suffix),
                Body=tei_xml,
                StorageClass=self.s3_storage_class,
            )
            print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(tei_xml)))
            self.count["success-s3"] += 1
        sys.stderr.write("{}\n".format(self.count))


def main():

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--s3-bucket", required=True, type=str, help="AWS S3 bucket to upload into"
    )
    parser.add_argument(
        "--s3-prefix",
        type=str,
        default="grobid/",
        help="key prefix for items created in bucket",
    )
    parser.add_argument(
        "--s3-suffix", type=str, default=".tei.xml", help="file suffix for created objects"
    )
    parser.add_argument(
        "--s3-storage-class",
        type=str,
        default="STANDARD",
        help="AWS S3 storage class (redundancy) to use",
    )
    parser.add_argument(
        "dump_file", help="TSV/JSON dump file", default=sys.stdin, type=argparse.FileType("r")
    )
    args = parser.parse_args()

    sentry_sdk.init()

    worker = DeliverDumpGrobidS3(**args.__dict__)
    worker.run(args.dump_file)


if __name__ == "__main__":  # pragma: no cover
    main()