diff options
Diffstat (limited to 'python/extraction_ungrobided.py')
| -rwxr-xr-x | python/extraction_ungrobided.py | 136 | 
1 files changed, 136 insertions, 0 deletions
| diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py new file mode 100755 index 0000000..aedf715 --- /dev/null +++ b/python/extraction_ungrobided.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +Variant of extraction_cdx_grobid which takes a partial metadata list as input +instead of CDX.  + +This task list is dumped by another Hadoop job which scans over the HBase table +quickly, which allows this job to skip a (relatively) expensive HBase read +per-row. + +Requires: +- happybase +- mrjob +- wayback/GWB libraries +""" + +# XXX: some broken MRO thing going on in here due to python3 object wrangling +# in `wayback` library. Means we can't run pylint. +# pylint: skip-file + +import xml +import json +import raven +import struct +import mrjob +from common import parse_ungrobided_line +from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \ +    sentry_client + + +class MRExtractUnGrobided(MRExtractCdxGrobid): + +    # CDX lines in; JSON status out +    #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat' +    #INPUT_PROTOCOL = mrjob.protocol.RawProtocol +    INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol +    OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + +    def parse_line(self, raw_line): +        """Line should be TSV and have non-null fields: + +            - key (string) +            - f:c (string, json) +            - file:mime (string) +            - file:cdx (string, json) +        """ + +        if (raw_line.startswith(' ') or raw_line.startswith('#')): +            return None, dict(status="invalid", reason="line prefix", input=raw_line) + +        info = parse_ungrobided_line(raw_line) +        if info is None: +            return None, dict(status="invalid", reason="ungrobided parse") + +        if info['file:mime'] not in self.mime_filter: +            return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime']) + +        # If warc is not item/file.(w)arc.gz form, skip it +        if len(info['file:cdx']['warc'].split('/')) != 2: +            return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc']) + +        return info, None + +    @sentry_client.capture_exceptions +    def mapper(self, _, raw_line): +        """ +        1. parse filtered line +        2. fetch data from wayback +        3. submit to GROBID +          4. convert GROBID response to JSON (and metadata) +          6. determine "quality" +        6. push results to hbase +        """ + +        self.increment_counter('lines', 'total') + +        # Parse line and filter down +        info, status = self.parse_line(raw_line) +        if info is None: +            self.increment_counter('lines', status['status']) +            yield _, status +            return +        key = info['key'] +        if key in KEY_BLACKLIST: +            self.increment_counter('lines', 'blacklist') +            yield _, dict(status='blacklist', key=key) +            return + +        # Note: this may not get "cleared" correctly +        sentry_client.extra_context(dict(row_key=key)) + +        # Do the extraction +        info, status = self.extract(info) +        if info is None: +            self.increment_counter('lines', status['status']) +            status['key'] = key +            yield _, status +            return +        extraction_status = status + +        # Decide what to bother inserting back into HBase +        # Basically, don't overwrite backfill fields. +        grobid_status_code = info.get('grobid0:status_code', None) +        for k in list(info.keys()): +            if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'): +                info.pop(k) + +        # Convert fields to binary +        for k in list(info.keys()): +            if info[k] is None: +                info.pop(k) +            # NOTE: we're not actually sending these f:*, file:* keys... +            elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', +                    'grobid0:metadata'): +                assert type(info[k]) == dict +                info[k] = json.dumps(info[k], sort_keys=True, indent=None) +            elif k in ('file:size', 'grobid0:status_code'): +                # encode as int64 in network byte order +                if info[k] != {} and info[k] != None: +                    info[k] = struct.pack('!q', info[k]) + +        key = info.pop('key') +        self.hb_table.put(key, info) +        self.increment_counter('lines', 'success') + +        if extraction_status is not None: +            yield _, dict(status="partial", key=key, +                grobid_status_code=grobid_status_code, +                reason=extraction_status['reason']) +        else: +            yield _, dict(status="success", +                grobid_status_code=grobid_status_code, key=key, +                extra=extraction_status) + + +if __name__ == '__main__': # pragma: no cover +    MRExtractCdxGrobid.run() | 
