diff options
Diffstat (limited to 'python/extraction_ungrobided.py')
-rwxr-xr-x | python/extraction_ungrobided.py | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py new file mode 100755 index 0000000..aedf715 --- /dev/null +++ b/python/extraction_ungrobided.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +Variant of extraction_cdx_grobid which takes a partial metadata list as input +instead of CDX. + +This task list is dumped by another Hadoop job which scans over the HBase table +quickly, which allows this job to skip a (relatively) expensive HBase read +per-row. + +Requires: +- happybase +- mrjob +- wayback/GWB libraries +""" + +# XXX: some broken MRO thing going on in here due to python3 object wrangling +# in `wayback` library. Means we can't run pylint. +# pylint: skip-file + +import xml +import json +import raven +import struct +import mrjob +from common import parse_ungrobided_line +from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \ + sentry_client + + +class MRExtractUnGrobided(MRExtractCdxGrobid): + + # CDX lines in; JSON status out + #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat' + #INPUT_PROTOCOL = mrjob.protocol.RawProtocol + INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol + OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + + def parse_line(self, raw_line): + """Line should be TSV and have non-null fields: + + - key (string) + - f:c (string, json) + - file:mime (string) + - file:cdx (string, json) + """ + + if (raw_line.startswith(' ') or raw_line.startswith('#')): + return None, dict(status="invalid", reason="line prefix", input=raw_line) + + info = parse_ungrobided_line(raw_line) + if info is None: + return None, dict(status="invalid", reason="ungrobided parse") + + if info['file:mime'] not in self.mime_filter: + return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime']) + + # If warc is not item/file.(w)arc.gz form, skip it + if len(info['file:cdx']['warc'].split('/')) != 2: + return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc']) + + return info, None + + @sentry_client.capture_exceptions + def mapper(self, _, raw_line): + """ + 1. parse filtered line + 2. fetch data from wayback + 3. submit to GROBID + 4. convert GROBID response to JSON (and metadata) + 6. determine "quality" + 6. push results to hbase + """ + + self.increment_counter('lines', 'total') + + # Parse line and filter down + info, status = self.parse_line(raw_line) + if info is None: + self.increment_counter('lines', status['status']) + yield _, status + return + key = info['key'] + if key in KEY_BLACKLIST: + self.increment_counter('lines', 'blacklist') + yield _, dict(status='blacklist', key=key) + return + + # Note: this may not get "cleared" correctly + sentry_client.extra_context(dict(row_key=key)) + + # Do the extraction + info, status = self.extract(info) + if info is None: + self.increment_counter('lines', status['status']) + status['key'] = key + yield _, status + return + extraction_status = status + + # Decide what to bother inserting back into HBase + # Basically, don't overwrite backfill fields. + grobid_status_code = info.get('grobid0:status_code', None) + for k in list(info.keys()): + if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'): + info.pop(k) + + # Convert fields to binary + for k in list(info.keys()): + if info[k] is None: + info.pop(k) + # NOTE: we're not actually sending these f:*, file:* keys... + elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', + 'grobid0:metadata'): + assert type(info[k]) == dict + info[k] = json.dumps(info[k], sort_keys=True, indent=None) + elif k in ('file:size', 'grobid0:status_code'): + # encode as int64 in network byte order + if info[k] != {} and info[k] != None: + info[k] = struct.pack('!q', info[k]) + + key = info.pop('key') + self.hb_table.put(key, info) + self.increment_counter('lines', 'success') + + if extraction_status is not None: + yield _, dict(status="partial", key=key, + grobid_status_code=grobid_status_code, + reason=extraction_status['reason']) + else: + yield _, dict(status="success", + grobid_status_code=grobid_status_code, key=key, + extra=extraction_status) + + +if __name__ == '__main__': # pragma: no cover + MRExtractCdxGrobid.run() |