aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce')
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py10
-rw-r--r--mapreduce/tests/test_extraction_cdx_grobid.py53
2 files changed, 43 insertions, 20 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index 708e170..db36cac 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -143,7 +143,7 @@ class MRExtractCdxGrobid(MRJob):
return info, dict(status="error", reason="non-200 GROBID HTTP status",
extra=grobid_response.content)
- info['grobid0:status'] = {}
+ info['grobid0:status'] = {'status': 'success'}
info['grobid0:tei_xml'] = grobid_response.content
# Convert TEI XML to JSON
@@ -189,9 +189,9 @@ class MRExtractCdxGrobid(MRJob):
key = info['key']
# Check if we've already processed this line
- oldrow = self.hb_table.row(key, columns=['f', 'file',
- 'grobid0:status_code'])
- if oldrow.get('grobid0:status', None):
+ oldrow = self.hb_table.row(key, columns=[b'f', b'file',
+ b'grobid0:status_code'])
+ if oldrow.get(b'grobid0:status_code', None) != None:
# This file has already been processed; skip it
self.increment_counter('lines', 'existing')
yield _, dict(status="existing", key=key)
@@ -209,7 +209,7 @@ class MRExtractCdxGrobid(MRJob):
# Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
grobid_status = info.get('grobid0:status_code', None)
for k in list(info.keys()):
- if k in oldrow:
+ if k.encode('utf-8') in oldrow:
info.pop(k)
# Convert fields to binary
diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py
index 8549054..fa6f71f 100644
--- a/mapreduce/tests/test_extraction_cdx_grobid.py
+++ b/mapreduce/tests/test_extraction_cdx_grobid.py
@@ -12,6 +12,7 @@ from extraction_cdx_grobid import MRExtractCdxGrobid, Resource
FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
+OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"""
@pytest.fixture
def job():
@@ -163,10 +164,8 @@ def test_grobid_503(mock_fetch, job):
responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503,
body=status)
- raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""")
-
output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
job.run_mapper()
row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
status = json.loads(row[b'grobid0:status'].decode('utf-8'))
@@ -181,10 +180,8 @@ def test_grobid_not_xml(mock_fetch, job):
responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
body=payload)
- raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""")
-
output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
job.run_mapper()
row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
@@ -198,10 +195,8 @@ def test_grobid_invalid_connection(mock_fetch, job):
status = b'{"status": "done broke"}'
job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument'
- raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""")
-
output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
job.run_mapper()
output = output.getvalue().decode('utf-8')
assert 'error' in output
@@ -213,10 +208,8 @@ def test_wayback_failure(job):
job.options.warc_uri_prefix = 'http://host.invalid/'
- raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""")
-
output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
job.run_mapper()
output = output.getvalue().decode('utf-8')
assert 'error' in output
@@ -238,10 +231,8 @@ def test_wayback_not_found(mock_rs, job):
job.options.warc_uri_prefix = 'http://dummy-archive.org/'
- raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""")
-
output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
job.run_mapper()
output = output.getvalue().decode('utf-8')
@@ -250,3 +241,35 @@ def test_wayback_not_found(mock_rs, job):
assert 'not 200' in output
assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {}
+
+@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
+@responses.activate
+def test_mapper_rerun(mock_fetch, job):
+
+ responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=b"FAKE", content_type='text/xml')
+
+ output1 = io.BytesIO()
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1)
+ job.run_mapper()
+ output1 = output1.getvalue().decode('utf-8')
+
+ # wayback gets FETCH 1x times
+ assert mock_fetch.call_count == 1
+ # grobid gets POST 1x times
+ assert len(responses.calls) == 1
+ # HBase
+ assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {}
+ assert 'success' in output1
+
+ # Run again, same line
+ output2 = io.BytesIO()
+ job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output2)
+ job.run_mapper()
+ output2 = output2.getvalue().decode('utf-8')
+
+ # wayback still only FETCH 1x times
+ assert mock_fetch.call_count == 1
+ # grobid still only POST 1x times
+ assert len(responses.calls) == 1
+ assert 'existing' in output2