aboutsummaryrefslogtreecommitdiffstats
path: root/python/extraction_ungrobided.py
blob: 74644e03845f9c37dc543b5575398c047fb544f3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
"""
Variant of extraction_cdx_grobid which takes a partial metadata list as input
instead of CDX. 

This task list is dumped by another Hadoop job which scans over the HBase table
quickly, which allows this job to skip a (relatively) expensive HBase read
per-row.

Requires:
- happybase
- mrjob
- wayback/GWB libraries
"""

# XXX: some broken MRO thing going on in here due to python3 object wrangling
# in `wayback` library. Means we can't run pylint.
# pylint: skip-file

import xml
import json
import raven
import struct
import mrjob
from common import parse_ungrobided_line
from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \
    sentry_client


class MRExtractUnGrobided(MRExtractCdxGrobid):

    # "ungrobided" TSV lines in; JSON status out
    #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
    #INPUT_PROTOCOL = mrjob.protocol.RawProtocol
    INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
    OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol

    def parse_ungrobided_line(self, raw_line):
        """Line should be TSV and have non-null fields:

            - key (string)
            - f:c (string, json)
            - file:mime (string)
            - file:cdx (string, json)
        """

        if (raw_line.startswith(' ') or raw_line.startswith('#')):
            return None, dict(status="invalid", reason="line prefix", input=raw_line)

        info = parse_ungrobided_line(raw_line)
        if info is None:
            return None, dict(status="invalid", reason="ungrobided parse")

        if info['file:mime'] not in self.mime_filter:
            return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime'])

        # If warc is not item/file.(w)arc.gz form, skip it
        if len(info['file:cdx']['warc'].split('/')) != 2:
            return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc'])

        return info, None

    @sentry_client.capture_exceptions
    def mapper(self, _, raw_line):
        """
        1. parse filtered line
        2. fetch data from wayback
        3. submit to GROBID
          4. convert GROBID response to JSON (and metadata)
          6. determine "quality"
        6. push results to hbase
        """

        self.increment_counter('lines', 'total')

        # Parse line and filter down
        info, status = self.parse_ungrobided_line(raw_line)
        if info is None:
            self.increment_counter('lines', status['status'])
            yield _, status
            return
        key = info['key']
        if key in KEY_BLACKLIST:
            self.increment_counter('lines', 'blacklist')
            yield _, dict(status='blacklist', key=key)
            return

        # Note: this may not get "cleared" correctly
        sentry_client.extra_context(dict(row_key=key))

        # Do the extraction
        info, status = self.extract(info)
        if info is None:
            self.increment_counter('lines', status['status'])
            status['key'] = key
            yield _, status
            return
        extraction_status = status

        # Decide what to bother inserting back into HBase
        # Basically, don't overwrite backfill fields.
        grobid_status_code = info.get('grobid0:status_code', None)
        for k in list(info.keys()):
            if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'):
                info.pop(k)

        # Convert fields to binary
        for k in list(info.keys()):
            if info[k] is None:
                info.pop(k)
            # NOTE: we're not actually sending these f:*, file:* keys...
            elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
                    'grobid0:metadata'):
                assert type(info[k]) == dict
                info[k] = json.dumps(info[k], sort_keys=True, indent=None)
            elif k in ('file:size', 'grobid0:status_code'):
                # encode as int64 in network byte order
                if info[k] != {} and info[k] != None:
                    info[k] = struct.pack('!q', info[k])

        key = info.pop('key')
        self.hb_table.put(key, info)
        self.increment_counter('lines', 'success')

        if extraction_status is not None:
            yield _, dict(status="partial", key=key,
                grobid_status_code=grobid_status_code,
                reason=extraction_status['reason'])
        else:
            yield _, dict(status="success",
                grobid_status_code=grobid_status_code, key=key,
                extra=extraction_status)


if __name__ == '__main__': # pragma: no cover
    MRExtractCdxGrobid.run()