aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce/extraction_cdx_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py50
1 files changed, 27 insertions, 23 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index bcb8663..9a0d795 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -18,6 +18,7 @@ Requires:
import io
import sys
+import xml
import json
import struct
import requests
@@ -67,10 +68,6 @@ class MRExtractCdxGrobid(MRJob):
def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
files={'input': content})
- if r.status_code is not 200:
- # if invalid file, get a 400 with JSON body with 'description' key (and others)
- # XXX:
- return None
return r
def mapper_init(self):
@@ -98,12 +95,10 @@ class MRExtractCdxGrobid(MRJob):
return None, dict(status="invalid", reason="CDX parse")
if info['file:mime'] not in self.mime_filter:
- self.increment_counter('lines', 'skip')
return None, dict(status="skip", reason="mimetype")
# If warc is not item/file.(w)arc.gz form, skip it
if len(info['file:cdx']['warc'].split('/')) != 2:
- self.increment_counter('lines', 'skip')
return None, dict(status="skip", reason="WARC path not petabox item/file")
return info, None
@@ -114,13 +109,9 @@ class MRExtractCdxGrobid(MRJob):
rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
gwb_record = rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable as err:
- # XXX: during testing
- raise err
- self.increment_counter('lines', 'petabox_error')
return None, dict(status="petabox_error", reason="failed to load file contents")
if gwb_record.get_status()[0] != 200:
- self.increment_counter('lines', 'error')
return None, dict(status="error",
reason="archived HTTP response (WARC) was not 200",
warc_status=gwb_record.get_status()[0])
@@ -134,7 +125,6 @@ class MRExtractCdxGrobid(MRJob):
info['file:cdx']['offset'],
info['file:cdx']['c_size'])
if status:
- self.increment_counter('lines', status['status'])
return None, status
info['file:size'] = len(original_content)
@@ -142,20 +132,32 @@ class MRExtractCdxGrobid(MRJob):
# Submit to GROBID
try:
grobid_response = self.grobid_process_fulltext(original_content)
- except IOError as ioe:
- raise ioe
- # XXX: catch correct error
- self.increment_counter('lines', 'fail')
- return None, dict(status="fail", reason="GROBID connection")
+ except requests.exceptions.ConnectionError:
+ return None, dict(status="error", reason="connection to GROBID worker")
info['grobid0:status_code'] = grobid_response.status_code
+ if grobid_response.status_code is not 200:
+ # response.text is .content decoded as utf-8
+ info['grobid0:status'] = json.loads(grobid_response.text)
+ return info, dict(status="error", reason="non-200 GROBID HTTP status",
+ extra=grobid_response.content)
+
+ info['grobid0:status'] = {}
info['grobid0:tei_xml'] = grobid_response.content
- info['grobid0:status'] = {} # TODO
# Convert TEI XML to JSON
- # TODO:
- info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True)
- info['grobid0:metadata'] = teixml2json(grobid_response.content, encumbered=False)
+ try:
+ info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True)
+ except xml.etree.ElementTree.ParseError:
+ return info, dict(status="fail", reason="GROBID 200 XML parse error")
+ except ValueError:
+ return info, dict(status="fail", reason="GROBID 200 XML non-TEI content")
+
+ tei_metadata = info['grobid0:tei_json'].copy()
+ for k in ('body', 'annex'):
+ # Remove fulltext (copywritted) content
+ tei_metadata.pop(k, None)
+ info['grobid0:metadata'] = tei_metadata
# Determine extraction "quality"
# TODO:
@@ -183,20 +185,22 @@ class MRExtractCdxGrobid(MRJob):
self.increment_counter('lines', status['status'])
yield _, status
return
+ key = info['key']
# Check if we've already processed this line
- oldrow = self.hb_table.row(info['key'], columns=['f', 'file',
+ oldrow = self.hb_table.row(key, columns=['f', 'file',
'grobid0:status_code'])
if oldrow.get('grobid0:status', None):
# This file has already been processed; skip it
self.increment_counter('lines', 'existing')
- yield _, dict(status="existing")
+ yield _, dict(status="existing", key=key)
return
# Do the extraction
info, status = self.extract(info)
if info is None:
self.increment_counter('lines', status['status'])
+ status['key'] = key
yield _, status
return
@@ -224,7 +228,7 @@ class MRExtractCdxGrobid(MRJob):
self.hb_table.put(key, info)
self.increment_counter('lines', 'success')
- yield _, dict(status="success", grobid_status=grobid_status)
+ yield _, dict(status="success", grobid_status=grobid_status, key=key)
if __name__ == '__main__': # pragma: no cover