aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py16
-rw-r--r--mapreduce/tests/test_extraction_cdx_grobid.py54
2 files changed, 61 insertions, 9 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index db36cac..c29b27e 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -189,8 +189,8 @@ class MRExtractCdxGrobid(MRJob):
key = info['key']
# Check if we've already processed this line
- oldrow = self.hb_table.row(key, columns=[b'f', b'file',
- b'grobid0:status_code'])
+ oldrow = self.hb_table.row(key,
+ columns=[b'f:c', b'file', b'grobid0:status_code'])
if oldrow.get(b'grobid0:status_code', None) != None:
# This file has already been processed; skip it
self.increment_counter('lines', 'existing')
@@ -204,10 +204,11 @@ class MRExtractCdxGrobid(MRJob):
status['key'] = key
yield _, status
return
+ extraction_status = status
# Decide what to bother inserting back into HBase
# Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
- grobid_status = info.get('grobid0:status_code', None)
+ grobid_status_code = info.get('grobid0:status_code', None)
for k in list(info.keys()):
if k.encode('utf-8') in oldrow:
info.pop(k)
@@ -229,7 +230,14 @@ class MRExtractCdxGrobid(MRJob):
self.hb_table.put(key, info)
self.increment_counter('lines', 'success')
- yield _, dict(status="success", grobid_status=grobid_status, key=key)
+ if extraction_status is not None:
+ yield _, dict(status="partial", key=key,
+ grobid_status_code=grobid_status_code,
+ reason=extraction_status['reason'])
+ else:
+ yield _, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key,
+ extra=extraction_status)
if __name__ == '__main__': # pragma: no cover
diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py
index fa6f71f..1bf2420 100644
--- a/mapreduce/tests/test_extraction_cdx_grobid.py
+++ b/mapreduce/tests/test_extraction_cdx_grobid.py
@@ -14,6 +14,9 @@ from extraction_cdx_grobid import MRExtractCdxGrobid, Resource
FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"""
+with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
+ REAL_TEI_XML = f.read()
+
@pytest.fixture
def job():
"""
@@ -34,10 +37,8 @@ def job():
@responses.activate
def test_mapper_lines(mock_fetch, job):
- with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
- real_tei_xml = f.read()
responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=real_tei_xml, content_type='text/xml')
+ body=REAL_TEI_XML, content_type='text/xml')
raw = io.BytesIO(b"""
com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
@@ -81,7 +82,7 @@ com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robo
# TODO: assert row[b'grobid0:quality'] == None
status = json.loads(row[b'grobid0:status'].decode('utf-8'))
assert type(status) == type(dict())
- assert row[b'grobid0:tei_xml'].decode('utf-8') == real_tei_xml
+ assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML
tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
assert tei_json['title'] == metadata['title']
@@ -183,10 +184,31 @@ def test_grobid_not_xml(mock_fetch, job):
output = io.BytesIO()
job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
job.run_mapper()
+ output = output.getvalue().decode('utf-8')
+ row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
+ assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
+ assert row[b'grobid0:tei_xml'] == payload
+ assert b'grobid0:tei_json' not in row
+ assert "XML parse error" in output
+
+
+@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
+@responses.activate
+def test_grobid_not_tei(mock_fetch, job):
+
+ payload = b'<xml></xml>'
+ responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=payload)
+
+ output = io.BytesIO()
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
+ job.run_mapper()
+ output = output.getvalue().decode('utf-8')
row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
assert row[b'grobid0:tei_xml'] == payload
assert b'grobid0:tei_json' not in row
+ assert "non-TEI content" in output
@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
@@ -247,7 +269,7 @@ def test_wayback_not_found(mock_rs, job):
def test_mapper_rerun(mock_fetch, job):
responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=b"FAKE", content_type='text/xml')
+ body=REAL_TEI_XML, content_type='text/xml')
output1 = io.BytesIO()
job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1)
@@ -273,3 +295,25 @@ def test_mapper_rerun(mock_fetch, job):
# grobid still only POST 1x times
assert len(responses.calls) == 1
assert 'existing' in output2
+
+@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
+@responses.activate
+def test_mapper_previously_backfilled(mock_fetch, job):
+
+ responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=REAL_TEI_XML, content_type='text/xml')
+
+ job.hb_table.put(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ',
+ {b'f:c': b'{"some": "dict"}', b'file:col': b'bogus'})
+ assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {}
+
+ output1 = io.BytesIO()
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1)
+ job.run_mapper()
+ output1 = output1.getvalue().decode('utf-8')
+
+ # wayback gets FETCH 1x times
+ assert mock_fetch.call_count == 1
+ # grobid gets POST 1x times
+ assert len(responses.calls) == 1
+ assert 'success' in output1