diff options
Diffstat (limited to 'mapreduce')
| -rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 10 | ||||
| -rw-r--r-- | mapreduce/tests/test_extraction_cdx_grobid.py | 53 | 
2 files changed, 43 insertions, 20 deletions
| diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index 708e170..db36cac 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -143,7 +143,7 @@ class MRExtractCdxGrobid(MRJob):              return info, dict(status="error", reason="non-200 GROBID HTTP status",                  extra=grobid_response.content) -        info['grobid0:status'] = {} +        info['grobid0:status'] = {'status': 'success'}          info['grobid0:tei_xml'] = grobid_response.content          # Convert TEI XML to JSON @@ -189,9 +189,9 @@ class MRExtractCdxGrobid(MRJob):          key = info['key']          # Check if we've already processed this line -        oldrow = self.hb_table.row(key, columns=['f', 'file', -            'grobid0:status_code']) -        if oldrow.get('grobid0:status', None): +        oldrow = self.hb_table.row(key, columns=[b'f', b'file', +            b'grobid0:status_code']) +        if oldrow.get(b'grobid0:status_code', None) != None:              # This file has already been processed; skip it              self.increment_counter('lines', 'existing')              yield _, dict(status="existing", key=key) @@ -209,7 +209,7 @@ class MRExtractCdxGrobid(MRJob):          # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')          grobid_status = info.get('grobid0:status_code', None)          for k in list(info.keys()): -            if k in oldrow: +            if k.encode('utf-8') in oldrow:                  info.pop(k)          # Convert fields to binary diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index 8549054..fa6f71f 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -12,6 +12,7 @@ from extraction_cdx_grobid import MRExtractCdxGrobid, Resource  FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) +OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"""  @pytest.fixture  def job(): @@ -163,10 +164,8 @@ def test_grobid_503(mock_fetch, job):      responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503,          body=status) -    raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") -      output = io.BytesIO() -    job.sandbox(stdin=raw, stdout=output) +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)      job.run_mapper()      row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')      status = json.loads(row[b'grobid0:status'].decode('utf-8')) @@ -181,10 +180,8 @@ def test_grobid_not_xml(mock_fetch, job):      responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,          body=payload) -    raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") -      output = io.BytesIO() -    job.sandbox(stdin=raw, stdout=output) +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)      job.run_mapper()      row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')      assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 @@ -198,10 +195,8 @@ def test_grobid_invalid_connection(mock_fetch, job):      status = b'{"status": "done broke"}'      job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument' -    raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") -      output = io.BytesIO() -    job.sandbox(stdin=raw, stdout=output) +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)      job.run_mapper()      output = output.getvalue().decode('utf-8')      assert 'error' in output @@ -213,10 +208,8 @@ def test_wayback_failure(job):      job.options.warc_uri_prefix = 'http://host.invalid/' -    raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") -      output = io.BytesIO() -    job.sandbox(stdin=raw, stdout=output) +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)      job.run_mapper()      output = output.getvalue().decode('utf-8')      assert 'error' in output @@ -238,10 +231,8 @@ def test_wayback_not_found(mock_rs, job):      job.options.warc_uri_prefix = 'http://dummy-archive.org/' -    raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") -      output = io.BytesIO() -    job.sandbox(stdin=raw, stdout=output) +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)      job.run_mapper()      output = output.getvalue().decode('utf-8') @@ -250,3 +241,35 @@ def test_wayback_not_found(mock_rs, job):      assert 'not 200' in output      assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {} + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_rerun(mock_fetch, job): + +    responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, +        body=b"FAKE", content_type='text/xml') + +    output1 = io.BytesIO() +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) +    job.run_mapper() +    output1 = output1.getvalue().decode('utf-8') + +    # wayback gets FETCH 1x times +    assert mock_fetch.call_count == 1 +    # grobid gets POST 1x times +    assert len(responses.calls) == 1 +    # HBase +    assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {} +    assert 'success' in output1 + +    # Run again, same line +    output2 = io.BytesIO() +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output2) +    job.run_mapper() +    output2 = output2.getvalue().decode('utf-8') + +    # wayback still only FETCH 1x times +    assert mock_fetch.call_count == 1 +    # grobid still only POST 1x times +    assert len(responses.calls) == 1 +    assert 'existing' in output2 | 
