diff options
Diffstat (limited to 'mapreduce')
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 87 | ||||
-rwxr-xr-x | mapreduce/grobid2json.py | 21 | ||||
-rw-r--r-- | mapreduce/tests/test_extraction_cdx_grobid.py | 41 |
3 files changed, 93 insertions, 56 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index ea36e6e..0ba95e6 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -15,16 +15,19 @@ Requires: import io import sys +import json import struct import requests import happybase import mrjob from mrjob.job import MRJob +import wayback.exception from wayback.resource import Resource from wayback.resource import ArcResource from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory from common import parse_cdx_line +from grobid2json import do_tei class MRExtractCdxGrobid(MRJob): @@ -60,16 +63,13 @@ class MRExtractCdxGrobid(MRJob): super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) self.mime_filter = ['application/pdf'] - def grobid_fulltext(self, content): + def grobid_process_fulltext(self, content): r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", files={'input': content}) if r.status_code is not 200: # XXX: - print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'))) - else: - # XXX: - print("SUCCESS: " + debug_line) - return r.json() + return None + return r def mapper_init(self): @@ -104,47 +104,56 @@ class MRExtractCdxGrobid(MRJob): return info, None - def extract(self, info): - - # Fetch data from WARCs in petabox + def fetch_warc_content(self, warc_path, offset, c_size): try: rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - gwb_record = rstore.load_resource( - info['file:cdx']['warc'], - info['file:cdx']['offset'], - info['file:cdx']['c_size']) - except IOError as ioe: - # XXX: catch correct error - self.increment_counter('lines', 'existing') - return _, dict(status="existing") + gwb_record = rstore.load_resource(warc_path, offset, c_size) + except wayback.exception.ResourceUnavailable as err: + # XXX: during testing + raise err + self.increment_counter('lines', 'petabox_error') + return None, dict(status="petabox_error", reason="failed to load file contents") if gwb_record.get_status()[0] != 200: self.increment_counter('lines', 'error') - return _, dict(status="error", reason="non-HTTP-200 WARC content") + return None, dict(status="error", reason="non-HTTP-200 WARC content") + return gwb_record.open_raw_content() + + def extract(self, info): + + # Fetch data from WARCs in petabox + content, status = self.fetch_warc_content( + info['file:cdx']['warc'], + info['file:cdx']['offset'], + info['file:cdx']['c_size']) + if status: + self.increment_counter('lines', status['status']) + return None, status + + info['file:size'] = len(content) # Submit to GROBID - content = gwb_record.open_raw_content() try: - grobid_result = self.grobid_fulltext(gwb_record.open_raw_content()) + grobid_response = self.grobid_process_fulltext(content) except IOError as ioe: + raise ioe # XXX: catch correct error - self.increment_counter('lines', 'existing') - return _, dict(status="existing") - - info['file:size'] = len(resource_data) + self.increment_counter('lines', 'fail') + return None, dict(status="fail", reason="GROBID connection") - info['grobid0:status_code'] = None - info['grobid0:quality'] = None - info['grobid0:status'] = {} - info['grobid0:tei_xml'] = None - info['grobid0:tei_json'] = {} - info['grobid0:metadata'] = {} + info['grobid0:status_code'] = grobid_response.status_code + info['grobid0:tei_xml'] = grobid_response.content + info['grobid0:status'] = {} # TODO # Convert TEI XML to JSON - # TODO + # TODO: + info['grobid0:tei_json'] = do_tei(grobid_response.content, encumbered=True) + info['grobid0:metadata'] = do_tei(grobid_response.content, encumbered=False) # Determine extraction "quality" - # TODO + # TODO: + + info['grobid0:quality'] = None return info, None @@ -187,19 +196,22 @@ class MRExtractCdxGrobid(MRJob): # Decide what to bother inserting back into HBase # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') grobid_status = info.get('grobid0:status_code', None) - for k in info.keys(): + for k in list(info.keys()): if k in oldrow: info.pop(k) # Convert fields to binary - for k in info.keys(): - if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', + for k in list(info.keys()): + if info[k] == None: + info.pop(k) + elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', 'grobid0:metadata'): assert type(info[k]) == dict info[k] = json.dumps(info[k], sort_keys=True, indent=None) - if k in ('file:size', 'grobid0:status_code'): + elif k in ('file:size', 'grobid0:status_code'): # encode as int64 in network byte order - info[k] = struct.pack('!q', info[k]) + if info[k] != {} and info[k] != None: + info[k] = struct.pack('!q', info[k]) key = info.pop('key') self.hb_table.put(key, info) @@ -207,6 +219,7 @@ class MRExtractCdxGrobid(MRJob): yield _, dict(status="success", grobid_status=grobid_status) + if __name__ == '__main__': # pragma: no cover MRExtractCdxGrobid.run() diff --git a/mapreduce/grobid2json.py b/mapreduce/grobid2json.py index daf9387..cc6eb2c 100755 --- a/mapreduce/grobid2json.py +++ b/mapreduce/grobid2json.py @@ -20,6 +20,7 @@ Prints JSON to stdout, errors to stderr """ import os +import io import sys import json import argparse @@ -73,11 +74,18 @@ def biblio_info(elem): return ref -def do_tei(path, encumbered=True): +def do_tei(content, encumbered=True): - info = dict(filename=os.path.basename(path)) + if type(content) == str: + content = io.StringIO(content) + elif type(content) == bytes: + content = io.BytesIO(content) - tree = ET.parse(path) + info = dict() + + #print(content) + #print(content.getvalue()) + tree = ET.parse(content) tei = tree.getroot() header = tei.find('.//{%s}teiHeader' % ns) @@ -109,7 +117,7 @@ def do_tei(path, encumbered=True): return info -def main(): +def main(): # pragma no cover parser = argparse.ArgumentParser( description="GROBID TEI XML to JSON", usage="%(prog)s [options] <teifile>...") @@ -121,9 +129,10 @@ def main(): args = parser.parse_args() for filename in args.teifiles: + content = open(filename, 'r') print(json.dumps( - do_tei(filename, + do_tei(content, encumbered=(not args.no_encumbered)))) -if __name__=='__main__': +if __name__=='__main__': # pragma no cover main() diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index 1d32c9f..46a89aa 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -1,13 +1,17 @@ import io import json -import pytest import mrjob +import pytest +import struct import responses import happybase_mock +from unittest import mock from extraction_cdx_grobid import MRExtractCdxGrobid +FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) + @pytest.fixture def job(): """ @@ -22,13 +26,14 @@ def job(): job = MRExtractCdxGrobid(['--no-conf', '-'], hb_table=table) return job - +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) @responses.activate -def test_mapper_lines(job): +def test_mapper_lines(mock_fetch, job): - fake_grobid = {} - responses.add(responses.POST, 'http://localhost:9070/api/processFulltextDocument', status=200, - body=json.dumps(fake_grobid), content_type='application/json') + with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: + real_tei_xml = f.read() + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=real_tei_xml, content_type='application/json') raw = io.BytesIO(b""" com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz @@ -36,16 +41,23 @@ eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=76139301 com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz """) - job.sandbox(stdin=raw) + output = io.BytesIO() + job.sandbox(stdin=raw, stdout=output) - pytest.skip("need to mock wayback fetch") job.run_mapper() + # for debugging tests + #print(output.getvalue().decode('utf-8')) + #print(list(job.hb_table.scan())) + # wayback gets FETCH 1x times + # TODO: # grobid gets POST 3x times + # TODO: # hbase + # TODO: assert job.hb_table.row(b'1') == {} @@ -58,15 +70,18 @@ com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robo row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') - assert struct.unpack("", row[b'file:size']) == 12345 + assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES) assert row[b'file:mime'] == b"application/pdf" - assert struct.unpack("", row[b'grobid0:status_code']) == 200 - assert row[b'grobid0:quality'] == None # TODO + assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 + # TODO: assert row[b'grobid0:quality'] == None status = json.loads(row[b'grobid0:status'].decode('utf-8')) - assert type(row[b'grobid0:status']) == type(dict()) - assert row[b'grobid0:tei_xml'] == "<xml><lorem>ipsum</lorem></xml>" + assert type(status) == type(dict()) + assert row[b'grobid0:tei_xml'].decode('utf-8') == real_tei_xml tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) + assert tei_json['title'] == metadata['title'] + assert 'body' in tei_json + assert 'body' not in metadata def test_parse_cdx_invalid(job): |