diff options
Diffstat (limited to 'mapreduce')
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 16 | ||||
-rw-r--r-- | mapreduce/tests/test_extraction_cdx_grobid.py | 54 |
2 files changed, 61 insertions, 9 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index db36cac..c29b27e 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -189,8 +189,8 @@ class MRExtractCdxGrobid(MRJob): key = info['key'] # Check if we've already processed this line - oldrow = self.hb_table.row(key, columns=[b'f', b'file', - b'grobid0:status_code']) + oldrow = self.hb_table.row(key, + columns=[b'f:c', b'file', b'grobid0:status_code']) if oldrow.get(b'grobid0:status_code', None) != None: # This file has already been processed; skip it self.increment_counter('lines', 'existing') @@ -204,10 +204,11 @@ class MRExtractCdxGrobid(MRJob): status['key'] = key yield _, status return + extraction_status = status # Decide what to bother inserting back into HBase # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') - grobid_status = info.get('grobid0:status_code', None) + grobid_status_code = info.get('grobid0:status_code', None) for k in list(info.keys()): if k.encode('utf-8') in oldrow: info.pop(k) @@ -229,7 +230,14 @@ class MRExtractCdxGrobid(MRJob): self.hb_table.put(key, info) self.increment_counter('lines', 'success') - yield _, dict(status="success", grobid_status=grobid_status, key=key) + if extraction_status is not None: + yield _, dict(status="partial", key=key, + grobid_status_code=grobid_status_code, + reason=extraction_status['reason']) + else: + yield _, dict(status="success", + grobid_status_code=grobid_status_code, key=key, + extra=extraction_status) if __name__ == '__main__': # pragma: no cover diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index fa6f71f..1bf2420 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -14,6 +14,9 @@ from extraction_cdx_grobid import MRExtractCdxGrobid, Resource FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""" +with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: + REAL_TEI_XML = f.read() + @pytest.fixture def job(): """ @@ -34,10 +37,8 @@ def job(): @responses.activate def test_mapper_lines(mock_fetch, job): - with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: - real_tei_xml = f.read() responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=real_tei_xml, content_type='text/xml') + body=REAL_TEI_XML, content_type='text/xml') raw = io.BytesIO(b""" com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz @@ -81,7 +82,7 @@ com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robo # TODO: assert row[b'grobid0:quality'] == None status = json.loads(row[b'grobid0:status'].decode('utf-8')) assert type(status) == type(dict()) - assert row[b'grobid0:tei_xml'].decode('utf-8') == real_tei_xml + assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) assert tei_json['title'] == metadata['title'] @@ -183,10 +184,31 @@ def test_grobid_not_xml(mock_fetch, job): output = io.BytesIO() job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() + output = output.getvalue().decode('utf-8') + row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') + assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 + assert row[b'grobid0:tei_xml'] == payload + assert b'grobid0:tei_json' not in row + assert "XML parse error" in output + + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_grobid_not_tei(mock_fetch, job): + + payload = b'<xml></xml>' + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=payload) + + output = io.BytesIO() + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) + job.run_mapper() + output = output.getvalue().decode('utf-8') row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 assert row[b'grobid0:tei_xml'] == payload assert b'grobid0:tei_json' not in row + assert "non-TEI content" in output @mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) @@ -247,7 +269,7 @@ def test_wayback_not_found(mock_rs, job): def test_mapper_rerun(mock_fetch, job): responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=b"FAKE", content_type='text/xml') + body=REAL_TEI_XML, content_type='text/xml') output1 = io.BytesIO() job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) @@ -273,3 +295,25 @@ def test_mapper_rerun(mock_fetch, job): # grobid still only POST 1x times assert len(responses.calls) == 1 assert 'existing' in output2 + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_previously_backfilled(mock_fetch, job): + + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=REAL_TEI_XML, content_type='text/xml') + + job.hb_table.put(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ', + {b'f:c': b'{"some": "dict"}', b'file:col': b'bogus'}) + assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {} + + output1 = io.BytesIO() + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) + job.run_mapper() + output1 = output1.getvalue().decode('utf-8') + + # wayback gets FETCH 1x times + assert mock_fetch.call_count == 1 + # grobid gets POST 1x times + assert len(responses.calls) == 1 + assert 'success' in output1 |