diff options
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 50 |
1 files changed, 27 insertions, 23 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index bcb8663..9a0d795 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -18,6 +18,7 @@ Requires: import io import sys +import xml import json import struct import requests @@ -67,10 +68,6 @@ class MRExtractCdxGrobid(MRJob): def grobid_process_fulltext(self, content): r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", files={'input': content}) - if r.status_code is not 200: - # if invalid file, get a 400 with JSON body with 'description' key (and others) - # XXX: - return None return r def mapper_init(self): @@ -98,12 +95,10 @@ class MRExtractCdxGrobid(MRJob): return None, dict(status="invalid", reason="CDX parse") if info['file:mime'] not in self.mime_filter: - self.increment_counter('lines', 'skip') return None, dict(status="skip", reason="mimetype") # If warc is not item/file.(w)arc.gz form, skip it if len(info['file:cdx']['warc'].split('/')) != 2: - self.increment_counter('lines', 'skip') return None, dict(status="skip", reason="WARC path not petabox item/file") return info, None @@ -114,13 +109,9 @@ class MRExtractCdxGrobid(MRJob): rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) gwb_record = rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable as err: - # XXX: during testing - raise err - self.increment_counter('lines', 'petabox_error') return None, dict(status="petabox_error", reason="failed to load file contents") if gwb_record.get_status()[0] != 200: - self.increment_counter('lines', 'error') return None, dict(status="error", reason="archived HTTP response (WARC) was not 200", warc_status=gwb_record.get_status()[0]) @@ -134,7 +125,6 @@ class MRExtractCdxGrobid(MRJob): info['file:cdx']['offset'], info['file:cdx']['c_size']) if status: - self.increment_counter('lines', status['status']) return None, status info['file:size'] = len(original_content) @@ -142,20 +132,32 @@ class MRExtractCdxGrobid(MRJob): # Submit to GROBID try: grobid_response = self.grobid_process_fulltext(original_content) - except IOError as ioe: - raise ioe - # XXX: catch correct error - self.increment_counter('lines', 'fail') - return None, dict(status="fail", reason="GROBID connection") + except requests.exceptions.ConnectionError: + return None, dict(status="error", reason="connection to GROBID worker") info['grobid0:status_code'] = grobid_response.status_code + if grobid_response.status_code is not 200: + # response.text is .content decoded as utf-8 + info['grobid0:status'] = json.loads(grobid_response.text) + return info, dict(status="error", reason="non-200 GROBID HTTP status", + extra=grobid_response.content) + + info['grobid0:status'] = {} info['grobid0:tei_xml'] = grobid_response.content - info['grobid0:status'] = {} # TODO # Convert TEI XML to JSON - # TODO: - info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True) - info['grobid0:metadata'] = teixml2json(grobid_response.content, encumbered=False) + try: + info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True) + except xml.etree.ElementTree.ParseError: + return info, dict(status="fail", reason="GROBID 200 XML parse error") + except ValueError: + return info, dict(status="fail", reason="GROBID 200 XML non-TEI content") + + tei_metadata = info['grobid0:tei_json'].copy() + for k in ('body', 'annex'): + # Remove fulltext (copywritted) content + tei_metadata.pop(k, None) + info['grobid0:metadata'] = tei_metadata # Determine extraction "quality" # TODO: @@ -183,20 +185,22 @@ class MRExtractCdxGrobid(MRJob): self.increment_counter('lines', status['status']) yield _, status return + key = info['key'] # Check if we've already processed this line - oldrow = self.hb_table.row(info['key'], columns=['f', 'file', + oldrow = self.hb_table.row(key, columns=['f', 'file', 'grobid0:status_code']) if oldrow.get('grobid0:status', None): # This file has already been processed; skip it self.increment_counter('lines', 'existing') - yield _, dict(status="existing") + yield _, dict(status="existing", key=key) return # Do the extraction info, status = self.extract(info) if info is None: self.increment_counter('lines', status['status']) + status['key'] = key yield _, status return @@ -224,7 +228,7 @@ class MRExtractCdxGrobid(MRJob): self.hb_table.put(key, info) self.increment_counter('lines', 'success') - yield _, dict(status="success", grobid_status=grobid_status) + yield _, dict(status="success", grobid_status=grobid_status, key=key) if __name__ == '__main__': # pragma: no cover |