aboutsummaryrefslogtreecommitdiffstats
path: root/python/scripts/deliver_dumpgrobid_to_s3.py
blob: ac0949c3acd745584735ff6349be94fcc32fe730 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3
"""
Tool for bulk uploading GROBID TEI-XML output from a local filesystem dump
(from HBase) to AWS S3.

See unpaywall delivery README (in bnewbold's scratch repo) for notes on running
this script for that specific use-case.

Script takes:
- input TSV: `sha1_hex, json (including grobid0:tei_xml)`
    => usually from dumpgrobid, with SHA-1 key transformed to hex, and filtered
       down (eg, by join by SHA-1) to a specific manifest
- AWS S3 bucket and prefix

AWS S3 credentials are passed via environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

Output:
- errors/stats to stderr
- log to stdout (redirect to file), prefixed by sha1

Requires:
- raven (sentry)
- boto3 (AWS S3 client library)
"""

import os
import sys
import json
import base64
import hashlib
import argparse
from collections import Counter

import boto3
import raven

# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
sentry_client = raven.Client()


def b32_hex(s):
    """copy/pasta from elsewhere"""
    s = s.strip().split()[0].lower()
    if s.startswith("sha1:"):
        s = s[5:]
    if len(s) != 32:
        return s
    return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')


class DeliverDumpGrobidS3():

    def __init__(self, s3_bucket, **kwargs):
        self.rstore = None
        self.count = Counter()
        self.s3_bucket = s3_bucket
        self.s3_prefix = kwargs.get('s3_prefix', 'grobid/')
        self.s3_suffix = kwargs.get('s3_suffix', '.tei.xml')
        self.s3_storage_class = kwargs.get('s3_storage_class', 'STANDARD')
        self.s3 = boto3.resource('s3')
        self.bucket = self.s3.Bucket(self.s3_bucket)

    def run(self, dump_file):
        sys.stderr.write("Starting...\n")
        for line in dump_file:
            line = line.strip().split('\t')
            if len(line) != 2:
                self.count['skip-line'] += 1
                continue
            sha1_hex, grobid_json = line[0], line[1]
            if len(sha1_hex) != 40:
                sha1_hex = b32_hex(sha1_hex)
            assert len(sha1_hex) == 40
            grobid = json.loads(grobid_json)
            tei_xml = grobid.get('tei_xml')
            if not tei_xml:
                print("{}\tskip empty".format(sha1_hex))
                self.count['skip-empty'] += 1
                continue
            tei_xml = tei_xml.encode('utf-8')
            # upload to AWS S3
            obj = self.bucket.put_object(
                Key="{}{}/{}{}".format(
                    self.s3_prefix,
                    sha1_hex[0:4],
                    sha1_hex,
                    self.s3_suffix,
                    StorageClass=self.s3_storage_class),
                Body=tei_xml)
            print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(tei_xml)))
            self.count['success-s3'] += 1
        sys.stderr.write("{}\n".format(self.count))

@sentry_client.capture_exceptions
def main():

    parser = argparse.ArgumentParser()
    parser.add_argument('--s3-bucket',
                        required=True,
                        type=str,
                        help='AWS S3 bucket to upload into')
    parser.add_argument('--s3-prefix',
                        type=str,
                        default="grobid/",
                        help='key prefix for items created in bucket')
    parser.add_argument('--s3-suffix',
                        type=str,
                        default=".tei.xml",
                        help='file suffix for created objects')
    parser.add_argument('--s3-storage-class',
                        type=str,
                        default="STANDARD",
                        help='AWS S3 storage class (redundancy) to use')
    parser.add_argument('dump_file',
                        help="TSV/JSON dump file",
                        default=sys.stdin,
                        type=argparse.FileType('r'))
    args = parser.parse_args()

    worker = DeliverDumpGrobidS3(**args.__dict__)
    worker.run(args.dump_file)

if __name__ == '__main__': # pragma: no cover
    main()