aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py50
-rw-r--r--mapreduce/tests/test_extraction_cdx_grobid.py27
2 files changed, 50 insertions, 27 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index bcb8663..9a0d795 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -18,6 +18,7 @@ Requires:
import io
import sys
+import xml
import json
import struct
import requests
@@ -67,10 +68,6 @@ class MRExtractCdxGrobid(MRJob):
def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
files={'input': content})
- if r.status_code is not 200:
- # if invalid file, get a 400 with JSON body with 'description' key (and others)
- # XXX:
- return None
return r
def mapper_init(self):
@@ -98,12 +95,10 @@ class MRExtractCdxGrobid(MRJob):
return None, dict(status="invalid", reason="CDX parse")
if info['file:mime'] not in self.mime_filter:
- self.increment_counter('lines', 'skip')
return None, dict(status="skip", reason="mimetype")
# If warc is not item/file.(w)arc.gz form, skip it
if len(info['file:cdx']['warc'].split('/')) != 2:
- self.increment_counter('lines', 'skip')
return None, dict(status="skip", reason="WARC path not petabox item/file")
return info, None
@@ -114,13 +109,9 @@ class MRExtractCdxGrobid(MRJob):
rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
gwb_record = rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable as err:
- # XXX: during testing
- raise err
- self.increment_counter('lines', 'petabox_error')
return None, dict(status="petabox_error", reason="failed to load file contents")
if gwb_record.get_status()[0] != 200:
- self.increment_counter('lines', 'error')
return None, dict(status="error",
reason="archived HTTP response (WARC) was not 200",
warc_status=gwb_record.get_status()[0])
@@ -134,7 +125,6 @@ class MRExtractCdxGrobid(MRJob):
info['file:cdx']['offset'],
info['file:cdx']['c_size'])
if status:
- self.increment_counter('lines', status['status'])
return None, status
info['file:size'] = len(original_content)
@@ -142,20 +132,32 @@ class MRExtractCdxGrobid(MRJob):
# Submit to GROBID
try:
grobid_response = self.grobid_process_fulltext(original_content)
- except IOError as ioe:
- raise ioe
- # XXX: catch correct error
- self.increment_counter('lines', 'fail')
- return None, dict(status="fail", reason="GROBID connection")
+ except requests.exceptions.ConnectionError:
+ return None, dict(status="error", reason="connection to GROBID worker")
info['grobid0:status_code'] = grobid_response.status_code
+ if grobid_response.status_code is not 200:
+ # response.text is .content decoded as utf-8
+ info['grobid0:status'] = json.loads(grobid_response.text)
+ return info, dict(status="error", reason="non-200 GROBID HTTP status",
+ extra=grobid_response.content)
+
+ info['grobid0:status'] = {}
info['grobid0:tei_xml'] = grobid_response.content
- info['grobid0:status'] = {} # TODO
# Convert TEI XML to JSON
- # TODO:
- info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True)
- info['grobid0:metadata'] = teixml2json(grobid_response.content, encumbered=False)
+ try:
+ info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True)
+ except xml.etree.ElementTree.ParseError:
+ return info, dict(status="fail", reason="GROBID 200 XML parse error")
+ except ValueError:
+ return info, dict(status="fail", reason="GROBID 200 XML non-TEI content")
+
+ tei_metadata = info['grobid0:tei_json'].copy()
+ for k in ('body', 'annex'):
+ # Remove fulltext (copywritted) content
+ tei_metadata.pop(k, None)
+ info['grobid0:metadata'] = tei_metadata
# Determine extraction "quality"
# TODO:
@@ -183,20 +185,22 @@ class MRExtractCdxGrobid(MRJob):
self.increment_counter('lines', status['status'])
yield _, status
return
+ key = info['key']
# Check if we've already processed this line
- oldrow = self.hb_table.row(info['key'], columns=['f', 'file',
+ oldrow = self.hb_table.row(key, columns=['f', 'file',
'grobid0:status_code'])
if oldrow.get('grobid0:status', None):
# This file has already been processed; skip it
self.increment_counter('lines', 'existing')
- yield _, dict(status="existing")
+ yield _, dict(status="existing", key=key)
return
# Do the extraction
info, status = self.extract(info)
if info is None:
self.increment_counter('lines', status['status'])
+ status['key'] = key
yield _, status
return
@@ -224,7 +228,7 @@ class MRExtractCdxGrobid(MRJob):
self.hb_table.put(key, info)
self.increment_counter('lines', 'success')
- yield _, dict(status="success", grobid_status=grobid_status)
+ yield _, dict(status="success", grobid_status=grobid_status, key=key)
if __name__ == '__main__': # pragma: no cover
diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py
index 713e501..729e68b 100644
--- a/mapreduce/tests/test_extraction_cdx_grobid.py
+++ b/mapreduce/tests/test_extraction_cdx_grobid.py
@@ -158,7 +158,7 @@ def test_parse_cdx_skip(job):
@responses.activate
def test_grobid_503(mock_fetch, job):
- status = b"{'status': 'done broke due to 503'}"
+ status = b'{"status": "done broke due to 503"}'
responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503,
body=status)
@@ -176,9 +176,9 @@ def test_grobid_503(mock_fetch, job):
@responses.activate
def test_grobid_not_xml(mock_fetch, job):
- status = b"{'status': 'done broke'}"
+ payload = b'this is not XML'
responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=status)
+ body=payload)
raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""")
@@ -186,5 +186,24 @@ def test_grobid_not_xml(mock_fetch, job):
job.sandbox(stdin=raw, stdout=output)
job.run_mapper()
row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
- assert json.loads(row[b'grobid0:status'].decode('utf-8')) == status
+ assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
+ assert row[b'grobid0:tei_xml'] == payload
+ assert b'grobid0:tei_json' not in row
+
+
+@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
+@responses.activate
+def test_grobid_invalid_connection(mock_fetch, job):
+
+ status = b'{"status": "done broke"}'
+ job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument'
+
+ raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""")
+
+ output = io.BytesIO()
+ job.sandbox(stdin=raw, stdout=output)
+ #with pytest.raises...
+ job.run_mapper()
+ assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {}
+# TODO: failure to fetch from wayback