diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2018-04-05 18:51:08 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2018-04-05 18:51:08 -0700 | 
| commit | 5db075beaa55b2d619798154c06c2df625346972 (patch) | |
| tree | 38897e99a26b751d3e93b1a2f5308ea6fa05eabb /mapreduce | |
| parent | 77577da13afe07b5177452122f4cee77e3357b4e (diff) | |
| download | sandcrawler-5db075beaa55b2d619798154c06c2df625346972.tar.gz sandcrawler-5db075beaa55b2d619798154c06c2df625346972.zip | |
progress on extractor
Diffstat (limited to 'mapreduce')
| -rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 87 | ||||
| -rwxr-xr-x | mapreduce/grobid2json.py | 21 | ||||
| -rw-r--r-- | mapreduce/tests/test_extraction_cdx_grobid.py | 41 | 
3 files changed, 93 insertions, 56 deletions
| diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index ea36e6e..0ba95e6 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -15,16 +15,19 @@ Requires:  import io  import sys +import json  import struct  import requests  import happybase  import mrjob  from mrjob.job import MRJob +import wayback.exception  from wayback.resource import Resource  from wayback.resource import ArcResource  from wayback.resourcestore import ResourceStore  from gwb.loader import CDXLoaderFactory  from common import parse_cdx_line +from grobid2json import do_tei  class MRExtractCdxGrobid(MRJob): @@ -60,16 +63,13 @@ class MRExtractCdxGrobid(MRJob):          super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)          self.mime_filter = ['application/pdf'] -    def grobid_fulltext(self, content): +    def grobid_process_fulltext(self, content):          r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",              files={'input': content})          if r.status_code is not 200:              # XXX: -            print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'))) -        else: -            # XXX: -            print("SUCCESS: " + debug_line) -        return r.json() +            return None +        return r      def mapper_init(self): @@ -104,47 +104,56 @@ class MRExtractCdxGrobid(MRJob):          return info, None -    def extract(self, info): - -        # Fetch data from WARCs in petabox +    def fetch_warc_content(self, warc_path, offset, c_size):          try:              rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) -            gwb_record = rstore.load_resource( -                info['file:cdx']['warc'], -                info['file:cdx']['offset'], -                info['file:cdx']['c_size']) -        except IOError as ioe: -            # XXX: catch correct error -            self.increment_counter('lines', 'existing') -            return _, dict(status="existing") +            gwb_record = rstore.load_resource(warc_path, offset, c_size) +        except wayback.exception.ResourceUnavailable as err: +            # XXX: during testing +            raise err +            self.increment_counter('lines', 'petabox_error') +            return None, dict(status="petabox_error", reason="failed to load file contents")          if gwb_record.get_status()[0] != 200:              self.increment_counter('lines', 'error') -            return _, dict(status="error", reason="non-HTTP-200 WARC content") +            return None, dict(status="error", reason="non-HTTP-200 WARC content") +        return gwb_record.open_raw_content() + +    def extract(self, info): + +        # Fetch data from WARCs in petabox +        content, status = self.fetch_warc_content( +            info['file:cdx']['warc'], +            info['file:cdx']['offset'], +            info['file:cdx']['c_size']) +        if status: +            self.increment_counter('lines', status['status']) +            return None, status + +        info['file:size'] = len(content)          # Submit to GROBID -        content = gwb_record.open_raw_content()          try: -            grobid_result = self.grobid_fulltext(gwb_record.open_raw_content()) +            grobid_response = self.grobid_process_fulltext(content)          except IOError as ioe: +            raise ioe              # XXX: catch correct error -            self.increment_counter('lines', 'existing') -            return _, dict(status="existing") - -        info['file:size'] = len(resource_data) +            self.increment_counter('lines', 'fail') +            return None, dict(status="fail", reason="GROBID connection") -        info['grobid0:status_code'] = None -        info['grobid0:quality'] = None -        info['grobid0:status'] = {} -        info['grobid0:tei_xml'] = None -        info['grobid0:tei_json'] = {} -        info['grobid0:metadata'] = {} +        info['grobid0:status_code'] = grobid_response.status_code +        info['grobid0:tei_xml'] = grobid_response.content +        info['grobid0:status'] = {} # TODO          # Convert TEI XML to JSON -        # TODO +        # TODO: +        info['grobid0:tei_json'] = do_tei(grobid_response.content, encumbered=True) +        info['grobid0:metadata'] = do_tei(grobid_response.content, encumbered=False)          # Determine extraction "quality" -        # TODO +        # TODO: + +        info['grobid0:quality'] = None          return info, None @@ -187,19 +196,22 @@ class MRExtractCdxGrobid(MRJob):          # Decide what to bother inserting back into HBase          # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')          grobid_status = info.get('grobid0:status_code', None) -        for k in info.keys(): +        for k in list(info.keys()):              if k in oldrow:                  info.pop(k)          # Convert fields to binary -        for k in info.keys(): -            if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', +        for k in list(info.keys()): +            if info[k] == None: +                info.pop(k) +            elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',                      'grobid0:metadata'):                  assert type(info[k]) == dict                  info[k] = json.dumps(info[k], sort_keys=True, indent=None) -            if k in ('file:size', 'grobid0:status_code'): +            elif k in ('file:size', 'grobid0:status_code'):                  # encode as int64 in network byte order -                info[k] = struct.pack('!q', info[k]) +                if info[k] != {} and info[k] != None: +                    info[k] = struct.pack('!q', info[k])          key = info.pop('key')          self.hb_table.put(key, info) @@ -207,6 +219,7 @@ class MRExtractCdxGrobid(MRJob):          yield _, dict(status="success", grobid_status=grobid_status) +  if __name__ == '__main__': # pragma: no cover      MRExtractCdxGrobid.run() diff --git a/mapreduce/grobid2json.py b/mapreduce/grobid2json.py index daf9387..cc6eb2c 100755 --- a/mapreduce/grobid2json.py +++ b/mapreduce/grobid2json.py @@ -20,6 +20,7 @@ Prints JSON to stdout, errors to stderr  """  import os +import io  import sys  import json  import argparse @@ -73,11 +74,18 @@ def biblio_info(elem):      return ref -def do_tei(path, encumbered=True): +def do_tei(content, encumbered=True): -    info = dict(filename=os.path.basename(path)) +    if type(content) == str: +        content = io.StringIO(content) +    elif type(content) == bytes: +        content = io.BytesIO(content) -    tree = ET.parse(path) +    info = dict() + +    #print(content) +    #print(content.getvalue()) +    tree = ET.parse(content)      tei = tree.getroot()      header = tei.find('.//{%s}teiHeader' % ns) @@ -109,7 +117,7 @@ def do_tei(path, encumbered=True):      return info -def main(): +def main():   # pragma no cover      parser = argparse.ArgumentParser(          description="GROBID TEI XML to JSON",          usage="%(prog)s [options] <teifile>...") @@ -121,9 +129,10 @@ def main():      args = parser.parse_args()      for filename in args.teifiles: +        content = open(filename, 'r')          print(json.dumps( -            do_tei(filename, +            do_tei(content,                 encumbered=(not args.no_encumbered)))) -if __name__=='__main__': +if __name__=='__main__':   # pragma no cover      main() diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py index 1d32c9f..46a89aa 100644 --- a/mapreduce/tests/test_extraction_cdx_grobid.py +++ b/mapreduce/tests/test_extraction_cdx_grobid.py @@ -1,13 +1,17 @@  import io  import json -import pytest  import mrjob +import pytest +import struct  import responses  import happybase_mock +from unittest import mock  from extraction_cdx_grobid import MRExtractCdxGrobid +FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) +  @pytest.fixture  def job():      """ @@ -22,13 +26,14 @@ def job():      job = MRExtractCdxGrobid(['--no-conf', '-'], hb_table=table)      return job - +@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))  @responses.activate -def test_mapper_lines(job): +def test_mapper_lines(mock_fetch, job): -    fake_grobid = {} -    responses.add(responses.POST, 'http://localhost:9070/api/processFulltextDocument', status=200, -        body=json.dumps(fake_grobid), content_type='application/json') +    with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: +        real_tei_xml = f.read() +    responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, +        body=real_tei_xml, content_type='application/json')      raw = io.BytesIO(b"""  com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz @@ -36,16 +41,23 @@ eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=76139301  com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz  """) -    job.sandbox(stdin=raw) +    output = io.BytesIO() +    job.sandbox(stdin=raw, stdout=output) -    pytest.skip("need to mock wayback fetch")      job.run_mapper() +    # for debugging tests +    #print(output.getvalue().decode('utf-8')) +    #print(list(job.hb_table.scan())) +      # wayback gets FETCH 1x times +    # TODO:      # grobid gets POST 3x times +    # TODO:      # hbase  +    # TODO:      assert job.hb_table.row(b'1') == {} @@ -58,15 +70,18 @@ com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robo      row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') -    assert struct.unpack("", row[b'file:size']) == 12345 +    assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)      assert row[b'file:mime'] == b"application/pdf" -    assert struct.unpack("", row[b'grobid0:status_code']) == 200 -    assert row[b'grobid0:quality'] == None # TODO +    assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 +    # TODO: assert row[b'grobid0:quality'] == None      status = json.loads(row[b'grobid0:status'].decode('utf-8')) -    assert type(row[b'grobid0:status']) == type(dict()) -    assert row[b'grobid0:tei_xml'] == "<xml><lorem>ipsum</lorem></xml>" +    assert type(status) == type(dict()) +    assert row[b'grobid0:tei_xml'].decode('utf-8') == real_tei_xml      tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))      metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) +    assert tei_json['title'] == metadata['title'] +    assert 'body' in tei_json +    assert 'body' not in metadata  def test_parse_cdx_invalid(job): | 
