diff options
Diffstat (limited to 'mapreduce')
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 10 | ||||
-rw-r--r-- | mapreduce/tests/test_extraction_cdx_grobid.py | 53 |
2 files changed, 43 insertions, 20 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index 708e170..db36cac 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -143,7 +143,7 @@ class MRExtractCdxGrobid(MRJob): return info, dict(status="error", reason="non-200 GROBID HTTP status", extra=grobid_response.content) - info['grobid0:status'] = {} + info['grobid0:status'] = {'status': 'success'} info['grobid0:tei_xml'] = grobid_response.content # Convert TEI XML to JSON @@ -189,9 +189,9 @@ class MRExtractCdxGrobid(MRJob): key = info['key'] # Check if we've already processed this line - oldrow = self.hb_table.row(key, columns=['f', 'file', - 'grobid0:status_code']) - if oldrow.get('grobid0:status', None): + oldrow = self.hb_table.row(key, columns=[b'f', b'file', + b'grobid0:status_code']) + if oldrow.get(b'grobid0:status_code', None) != None: # This file has already been processed; skip it self.increment_counter('lines', 'existing') yield _, dict(status="existing", key=key) @@ -209,7 +209,7 @@ class MRExtractCdxGrobid(MRJob): # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') grobid_status = info.get('grobid0:status_code', None) for k in list(info.keys()): - if k in oldrow: + if k.encode('utf-8') in oldrow: info.pop(k) # Convert fields to binary diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index 8549054..fa6f71f 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -12,6 +12,7 @@ from extraction_cdx_grobid import MRExtractCdxGrobid, Resource FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) +OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""" @pytest.fixture def job(): @@ -163,10 +164,8 @@ def test_grobid_503(mock_fetch, job): responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503, body=status) - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') status = json.loads(row[b'grobid0:status'].decode('utf-8')) @@ -181,10 +180,8 @@ def test_grobid_not_xml(mock_fetch, job): responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, body=payload) - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 @@ -198,10 +195,8 @@ def test_grobid_invalid_connection(mock_fetch, job): status = b'{"status": "done broke"}' job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument' - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() output = output.getvalue().decode('utf-8') assert 'error' in output @@ -213,10 +208,8 @@ def test_wayback_failure(job): job.options.warc_uri_prefix = 'http://host.invalid/' - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() output = output.getvalue().decode('utf-8') assert 'error' in output @@ -238,10 +231,8 @@ def test_wayback_not_found(mock_rs, job): job.options.warc_uri_prefix = 'http://dummy-archive.org/' - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() output = output.getvalue().decode('utf-8') @@ -250,3 +241,35 @@ def test_wayback_not_found(mock_rs, job): assert 'not 200' in output assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {} + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_rerun(mock_fetch, job): + + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=b"FAKE", content_type='text/xml') + + output1 = io.BytesIO() + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) + job.run_mapper() + output1 = output1.getvalue().decode('utf-8') + + # wayback gets FETCH 1x times + assert mock_fetch.call_count == 1 + # grobid gets POST 1x times + assert len(responses.calls) == 1 + # HBase + assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {} + assert 'success' in output1 + + # Run again, same line + output2 = io.BytesIO() + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output2) + job.run_mapper() + output2 = output2.getvalue().decode('utf-8') + + # wayback still only FETCH 1x times + assert mock_fetch.call_count == 1 + # grobid still only POST 1x times + assert len(responses.calls) == 1 + assert 'existing' in output2 |