diff options
Diffstat (limited to 'mapreduce')
| -rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 16 | ||||
| -rw-r--r-- | mapreduce/tests/test_extraction_cdx_grobid.py | 54 | 
2 files changed, 61 insertions, 9 deletions
| diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index db36cac..c29b27e 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -189,8 +189,8 @@ class MRExtractCdxGrobid(MRJob):          key = info['key']          # Check if we've already processed this line -        oldrow = self.hb_table.row(key, columns=[b'f', b'file', -            b'grobid0:status_code']) +        oldrow = self.hb_table.row(key, +            columns=[b'f:c', b'file', b'grobid0:status_code'])          if oldrow.get(b'grobid0:status_code', None) != None:              # This file has already been processed; skip it              self.increment_counter('lines', 'existing') @@ -204,10 +204,11 @@ class MRExtractCdxGrobid(MRJob):              status['key'] = key              yield _, status              return +        extraction_status = status          # Decide what to bother inserting back into HBase          # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') -        grobid_status = info.get('grobid0:status_code', None) +        grobid_status_code = info.get('grobid0:status_code', None)          for k in list(info.keys()):              if k.encode('utf-8') in oldrow:                  info.pop(k) @@ -229,7 +230,14 @@ class MRExtractCdxGrobid(MRJob):          self.hb_table.put(key, info)          self.increment_counter('lines', 'success') -        yield _, dict(status="success", grobid_status=grobid_status, key=key) +        if extraction_status is not None: +            yield _, dict(status="partial", key=key, +                grobid_status_code=grobid_status_code, +                reason=extraction_status['reason']) +        else: +            yield _, dict(status="success", +                grobid_status_code=grobid_status_code, key=key, +                extra=extraction_status)  if __name__ == '__main__': # pragma: no cover diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index fa6f71f..1bf2420 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -14,6 +14,9 @@ from extraction_cdx_grobid import MRExtractCdxGrobid, Resource  FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)  OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""" +with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: +    REAL_TEI_XML = f.read() +  @pytest.fixture  def job():      """ @@ -34,10 +37,8 @@ def job():  @responses.activate  def test_mapper_lines(mock_fetch, job): -    with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: -        real_tei_xml = f.read()      responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, -        body=real_tei_xml, content_type='text/xml') +        body=REAL_TEI_XML, content_type='text/xml')      raw = io.BytesIO(b"""  com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz @@ -81,7 +82,7 @@ com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robo      # TODO: assert row[b'grobid0:quality'] == None      status = json.loads(row[b'grobid0:status'].decode('utf-8'))      assert type(status) == type(dict()) -    assert row[b'grobid0:tei_xml'].decode('utf-8') == real_tei_xml +    assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML      tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))      metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))      assert tei_json['title'] == metadata['title'] @@ -183,10 +184,31 @@ def test_grobid_not_xml(mock_fetch, job):      output = io.BytesIO()      job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)      job.run_mapper() +    output = output.getvalue().decode('utf-8') +    row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') +    assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 +    assert row[b'grobid0:tei_xml'] == payload +    assert b'grobid0:tei_json' not in row +    assert "XML parse error" in output + + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_grobid_not_tei(mock_fetch, job): + +    payload = b'<xml></xml>' +    responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, +        body=payload) + +    output = io.BytesIO() +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) +    job.run_mapper() +    output = output.getvalue().decode('utf-8')      row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')      assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200      assert row[b'grobid0:tei_xml'] == payload      assert b'grobid0:tei_json' not in row +    assert "non-TEI content" in output  @mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) @@ -247,7 +269,7 @@ def test_wayback_not_found(mock_rs, job):  def test_mapper_rerun(mock_fetch, job):      responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, -        body=b"FAKE", content_type='text/xml') +        body=REAL_TEI_XML, content_type='text/xml')      output1 = io.BytesIO()      job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) @@ -273,3 +295,25 @@ def test_mapper_rerun(mock_fetch, job):      # grobid still only POST 1x times      assert len(responses.calls) == 1      assert 'existing' in output2 + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_previously_backfilled(mock_fetch, job): + +    responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, +        body=REAL_TEI_XML, content_type='text/xml') + +    job.hb_table.put(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ', +        {b'f:c': b'{"some": "dict"}', b'file:col': b'bogus'}) +    assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {} + +    output1 = io.BytesIO() +    job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) +    job.run_mapper() +    output1 = output1.getvalue().decode('utf-8') + +    # wayback gets FETCH 1x times +    assert mock_fetch.call_count == 1 +    # grobid gets POST 1x times +    assert len(responses.calls) == 1 +    assert 'success' in output1 | 
