From 41da591130b464e36d0b91d35796026b2d7c4088 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 10 Apr 2018 19:37:04 -0700 Subject: cleanup tests; add one for double-processing --- mapreduce/tests/test_extraction_cdx_grobid.py | 53 +++++++++++++++++++-------- 1 file changed, 38 insertions(+), 15 deletions(-) (limited to 'mapreduce/tests') diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index 8549054..fa6f71f 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -12,6 +12,7 @@ from extraction_cdx_grobid import MRExtractCdxGrobid, Resource FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) +OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""" @pytest.fixture def job(): @@ -163,10 +164,8 @@ def test_grobid_503(mock_fetch, job): responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503, body=status) - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') status = json.loads(row[b'grobid0:status'].decode('utf-8')) @@ -181,10 +180,8 @@ def test_grobid_not_xml(mock_fetch, job): responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, body=payload) - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 @@ -198,10 +195,8 @@ def test_grobid_invalid_connection(mock_fetch, job): status = b'{"status": "done broke"}' job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument' - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() output = output.getvalue().decode('utf-8') assert 'error' in output @@ -213,10 +208,8 @@ def test_wayback_failure(job): job.options.warc_uri_prefix = 'http://host.invalid/' - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() output = output.getvalue().decode('utf-8') assert 'error' in output @@ -238,10 +231,8 @@ def test_wayback_not_found(mock_rs, job): job.options.warc_uri_prefix = 'http://dummy-archive.org/' - raw = io.BytesIO(b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""") - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) job.run_mapper() output = output.getvalue().decode('utf-8') @@ -250,3 +241,35 @@ def test_wayback_not_found(mock_rs, job): assert 'not 200' in output assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {} + +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_rerun(mock_fetch, job): + + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=b"FAKE", content_type='text/xml') + + output1 = io.BytesIO() + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) + job.run_mapper() + output1 = output1.getvalue().decode('utf-8') + + # wayback gets FETCH 1x times + assert mock_fetch.call_count == 1 + # grobid gets POST 1x times + assert len(responses.calls) == 1 + # HBase + assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {} + assert 'success' in output1 + + # Run again, same line + output2 = io.BytesIO() + job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output2) + job.run_mapper() + output2 = output2.getvalue().decode('utf-8') + + # wayback still only FETCH 1x times + assert mock_fetch.call_count == 1 + # grobid still only POST 1x times + assert len(responses.calls) == 1 + assert 'existing' in output2 -- cgit v1.2.3