diff options
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 50 | ||||
-rw-r--r-- | mapreduce/tests/test_extraction_cdx_grobid.py | 27 |
2 files changed, 50 insertions, 27 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index bcb8663..9a0d795 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -18,6 +18,7 @@ Requires: import io import sys +import xml import json import struct import requests @@ -67,10 +68,6 @@ class MRExtractCdxGrobid(MRJob): def grobid_process_fulltext(self, content): r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", files={'input': content}) - if r.status_code is not 200: - # if invalid file, get a 400 with JSON body with 'description' key (and others) - # XXX: - return None return r def mapper_init(self): @@ -98,12 +95,10 @@ class MRExtractCdxGrobid(MRJob): return None, dict(status="invalid", reason="CDX parse") if info['file:mime'] not in self.mime_filter: - self.increment_counter('lines', 'skip') return None, dict(status="skip", reason="mimetype") # If warc is not item/file.(w)arc.gz form, skip it if len(info['file:cdx']['warc'].split('/')) != 2: - self.increment_counter('lines', 'skip') return None, dict(status="skip", reason="WARC path not petabox item/file") return info, None @@ -114,13 +109,9 @@ class MRExtractCdxGrobid(MRJob): rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) gwb_record = rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable as err: - # XXX: during testing - raise err - self.increment_counter('lines', 'petabox_error') return None, dict(status="petabox_error", reason="failed to load file contents") if gwb_record.get_status()[0] != 200: - self.increment_counter('lines', 'error') return None, dict(status="error", reason="archived HTTP response (WARC) was not 200", warc_status=gwb_record.get_status()[0]) @@ -134,7 +125,6 @@ class MRExtractCdxGrobid(MRJob): info['file:cdx']['offset'], info['file:cdx']['c_size']) if status: - self.increment_counter('lines', status['status']) return None, status info['file:size'] = len(original_content) @@ -142,20 +132,32 @@ class MRExtractCdxGrobid(MRJob): # Submit to GROBID try: grobid_response = self.grobid_process_fulltext(original_content) - except IOError as ioe: - raise ioe - # XXX: catch correct error - self.increment_counter('lines', 'fail') - return None, dict(status="fail", reason="GROBID connection") + except requests.exceptions.ConnectionError: + return None, dict(status="error", reason="connection to GROBID worker") info['grobid0:status_code'] = grobid_response.status_code + if grobid_response.status_code is not 200: + # response.text is .content decoded as utf-8 + info['grobid0:status'] = json.loads(grobid_response.text) + return info, dict(status="error", reason="non-200 GROBID HTTP status", + extra=grobid_response.content) + + info['grobid0:status'] = {} info['grobid0:tei_xml'] = grobid_response.content - info['grobid0:status'] = {} # TODO # Convert TEI XML to JSON - # TODO: - info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True) - info['grobid0:metadata'] = teixml2json(grobid_response.content, encumbered=False) + try: + info['grobid0:tei_json'] = teixml2json(grobid_response.content, encumbered=True) + except xml.etree.ElementTree.ParseError: + return info, dict(status="fail", reason="GROBID 200 XML parse error") + except ValueError: + return info, dict(status="fail", reason="GROBID 200 XML non-TEI content") + + tei_metadata = info['grobid0:tei_json'].copy() + for k in ('body', 'annex'): + # Remove fulltext (copywritted) content + tei_metadata.pop(k, None) + info['grobid0:metadata'] = tei_metadata # Determine extraction "quality" # TODO: @@ -183,20 +185,22 @@ class MRExtractCdxGrobid(MRJob): self.increment_counter('lines', status['status']) yield _, status return + key = info['key'] # Check if we've already processed this line - oldrow = self.hb_table.row(info['key'], columns=['f', 'file', + oldrow = self.hb_table.row(key, columns=['f', 'file', 'grobid0:status_code']) if oldrow.get('grobid0:status', None): # This file has already been processed; skip it self.increment_counter('lines', 'existing') - yield _, dict(status="existing") + yield _, dict(status="existing", key=key) return # Do the extraction info, status = self.extract(info) if info is None: self.increment_counter('lines', status['status']) + status['key'] = key yield _, status return @@ -224,7 +228,7 @@ class MRExtractCdxGrobid(MRJob): self.hb_table.put(key, info) self.increment_counter('lines', 'success') - yield _, dict(status="success", grobid_status=grobid_status) + yield _, dict(status="success", grobid_status=grobid_status, key=key) if __name__ == '__main__': # pragma: no cover diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index 713e501..729e68b 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -158,7 +158,7 @@ def test_parse_cdx_skip(job): @responses.activate def test_grobid_503(mock_fetch, job): - status = b"{'status': 'done broke due to 503'}" + status = b'{"status": "done broke due to 503"}' responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503, body=status) @@ -176,9 +176,9 @@ def test_grobid_503(mock_fetch, job): @responses.activate def test_grobid_not_xml(mock_fetch, job): - status = b"{'status': 'done broke'}" + payload = b'this is not XML' responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=status) + body=payload) raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") @@ -186,5 +186,24 @@ def test_grobid_not_xml(mock_fetch, job): job.sandbox(stdin=raw, stdout=output) job.run_mapper() row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') - assert json.loads(row[b'grobid0:status'].decode('utf-8')) == status + assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 + assert row[b'grobid0:tei_xml'] == payload + assert b'grobid0:tei_json' not in row + + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_grobid_invalid_connection(mock_fetch, job): + + status = b'{"status": "done broke"}' + job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument' + + raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") + + output = io.BytesIO() + job.sandbox(stdin=raw, stdout=output) + #with pytest.raises... + job.run_mapper() + assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {} +# TODO: failure to fetch from wayback |