diff options
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 87 |
1 files changed, 50 insertions, 37 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index ea36e6e..0ba95e6 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -15,16 +15,19 @@ Requires: import io import sys +import json import struct import requests import happybase import mrjob from mrjob.job import MRJob +import wayback.exception from wayback.resource import Resource from wayback.resource import ArcResource from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory from common import parse_cdx_line +from grobid2json import do_tei class MRExtractCdxGrobid(MRJob): @@ -60,16 +63,13 @@ class MRExtractCdxGrobid(MRJob): super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) self.mime_filter = ['application/pdf'] - def grobid_fulltext(self, content): + def grobid_process_fulltext(self, content): r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", files={'input': content}) if r.status_code is not 200: # XXX: - print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'))) - else: - # XXX: - print("SUCCESS: " + debug_line) - return r.json() + return None + return r def mapper_init(self): @@ -104,47 +104,56 @@ class MRExtractCdxGrobid(MRJob): return info, None - def extract(self, info): - - # Fetch data from WARCs in petabox + def fetch_warc_content(self, warc_path, offset, c_size): try: rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - gwb_record = rstore.load_resource( - info['file:cdx']['warc'], - info['file:cdx']['offset'], - info['file:cdx']['c_size']) - except IOError as ioe: - # XXX: catch correct error - self.increment_counter('lines', 'existing') - return _, dict(status="existing") + gwb_record = rstore.load_resource(warc_path, offset, c_size) + except wayback.exception.ResourceUnavailable as err: + # XXX: during testing + raise err + self.increment_counter('lines', 'petabox_error') + return None, dict(status="petabox_error", reason="failed to load file contents") if gwb_record.get_status()[0] != 200: self.increment_counter('lines', 'error') - return _, dict(status="error", reason="non-HTTP-200 WARC content") + return None, dict(status="error", reason="non-HTTP-200 WARC content") + return gwb_record.open_raw_content() + + def extract(self, info): + + # Fetch data from WARCs in petabox + content, status = self.fetch_warc_content( + info['file:cdx']['warc'], + info['file:cdx']['offset'], + info['file:cdx']['c_size']) + if status: + self.increment_counter('lines', status['status']) + return None, status + + info['file:size'] = len(content) # Submit to GROBID - content = gwb_record.open_raw_content() try: - grobid_result = self.grobid_fulltext(gwb_record.open_raw_content()) + grobid_response = self.grobid_process_fulltext(content) except IOError as ioe: + raise ioe # XXX: catch correct error - self.increment_counter('lines', 'existing') - return _, dict(status="existing") - - info['file:size'] = len(resource_data) + self.increment_counter('lines', 'fail') + return None, dict(status="fail", reason="GROBID connection") - info['grobid0:status_code'] = None - info['grobid0:quality'] = None - info['grobid0:status'] = {} - info['grobid0:tei_xml'] = None - info['grobid0:tei_json'] = {} - info['grobid0:metadata'] = {} + info['grobid0:status_code'] = grobid_response.status_code + info['grobid0:tei_xml'] = grobid_response.content + info['grobid0:status'] = {} # TODO # Convert TEI XML to JSON - # TODO + # TODO: + info['grobid0:tei_json'] = do_tei(grobid_response.content, encumbered=True) + info['grobid0:metadata'] = do_tei(grobid_response.content, encumbered=False) # Determine extraction "quality" - # TODO + # TODO: + + info['grobid0:quality'] = None return info, None @@ -187,19 +196,22 @@ class MRExtractCdxGrobid(MRJob): # Decide what to bother inserting back into HBase # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') grobid_status = info.get('grobid0:status_code', None) - for k in info.keys(): + for k in list(info.keys()): if k in oldrow: info.pop(k) # Convert fields to binary - for k in info.keys(): - if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', + for k in list(info.keys()): + if info[k] == None: + info.pop(k) + elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', 'grobid0:metadata'): assert type(info[k]) == dict info[k] = json.dumps(info[k], sort_keys=True, indent=None) - if k in ('file:size', 'grobid0:status_code'): + elif k in ('file:size', 'grobid0:status_code'): # encode as int64 in network byte order - info[k] = struct.pack('!q', info[k]) + if info[k] != {} and info[k] != None: + info[k] = struct.pack('!q', info[k]) key = info.pop('key') self.hb_table.put(key, info) @@ -207,6 +219,7 @@ class MRExtractCdxGrobid(MRJob): yield _, dict(status="success", grobid_status=grobid_status) + if __name__ == '__main__': # pragma: no cover MRExtractCdxGrobid.run() |