diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2018-04-04 11:47:41 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2018-04-04 11:47:41 -0700 | 
| commit | e8eb959fbdd5d13cd53421ddf2487811d049c4e8 (patch) | |
| tree | fbc8c052aac7d4eeb83da0a2d181fb585d2e4a8b /extraction | |
| parent | 7056c83d4a6bc107155eedb1b39f38dc6d290a39 (diff) | |
| download | sandcrawler-e8eb959fbdd5d13cd53421ddf2487811d049c4e8.tar.gz sandcrawler-e8eb959fbdd5d13cd53421ddf2487811d049c4e8.zip | |
more WIP on extractor
Diffstat (limited to 'extraction')
| -rw-r--r-- | extraction/TODO | 2 | ||||
| -rw-r--r-- | extraction/extraction.py | 52 | ||||
| -rwxr-xr-x | extraction/extraction_cdx_grobid.py | 248 | ||||
| -rw-r--r-- | extraction/tests/test_extraction_cdx_grobid.py | 169 | ||||
| -rw-r--r-- | extraction/xml2json.py | 8 | 
5 files changed, 427 insertions, 52 deletions
| diff --git a/extraction/TODO b/extraction/TODO new file mode 100644 index 0000000..ed10834 --- /dev/null +++ b/extraction/TODO @@ -0,0 +1,2 @@ +- abstract CDX line reading and HBase stuff out into a common library +- actual GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070" diff --git a/extraction/extraction.py b/extraction/extraction.py deleted file mode 100644 index cdca433..0000000 --- a/extraction/extraction.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 - -import io -import sys -import requests -#import happybase -import mrjob -from mrjob.job import MRJob -from wayback.resource import Resource -from wayback.resource import ArcResource -from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory - - -def process_pdf_using_grobid(content_buffer, debug_line): -    """Query GrobId server & process response -    """ -    GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070" -    content = content_buffer.read() -    r = requests.post(GROBID_SERVER + "/api/processFulltextDocument", -            files={'input': content}) -    if r.status_code is not 200: -        print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'), debug_line)) -    else: -        print("SUCCESS: " + debug_line) - -class Cdx_Record_Pipeline(object): - -    def read_cdx_and_parse(self, parser_func, accepted_mimes = []): -        """Read in CDX lines and process PDF records fetched over HTTP -        """ -        rstore = ResourceStore(loaderfactory=CDXLoaderFactory())  -        for line in sys.stdin: -            line = line.rstrip() -            cdx_line = line.split() -            #ignoring NLine offset -            if len(cdx_line) != 12: -                continue -            cdx_line = cdx_line[1:] -            (src_url, timestamp, mime, record_location, record_offset, record_length) = (cdx_line[2], cdx_line[1], cdx_line[3], cdx_line[-1], cdx_line[-2], cdx_line[-3]) -            if '-' == record_length or not record_location.endswith('arc.gz') or mime not in accepted_mimes: -                continue -            orig_url = cdx_line[2] -            debug_line = ' '.join(cdx_line) -            try: -                record_location = 'http://archive.org/download/' + record_location -                record_offset = int(record_offset) -                record_length = int(record_length) -                resource_data = rstore.load_resource(record_location, record_offset, record_length) -                parser_func(resource_data.open_raw_content(), debug_line) -            except: -                continue diff --git a/extraction/extraction_cdx_grobid.py b/extraction/extraction_cdx_grobid.py new file mode 100755 index 0000000..54d8b71 --- /dev/null +++ b/extraction/extraction_cdx_grobid.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +""" +Streaming Hadoop script to import extract metadata and body from fulltext (eg, +PDF) files using GROBID. Input is a CDX file; results primarly go to HBase, +with status written to configurable output stream. + +Fulltext files are loaded directly from WARC files in petabox, instead of going +through the wayback replay. + +Requires: +- happybase +- mrjob +- wayback/GWB libraries +""" + +import io +import sys +import struct +import requests +import happybase +import mrjob +from mrjob.job import MRJob +from wayback.resource import Resource +from wayback.resource import ArcResource +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory + + +def parse_cdx_line(raw_cdx): + +    cdx = raw_cdx.split() +    if len(cdx) < 11: +        return None + +    surt = cdx[0] +    dt = cdx[1] +    url = cdx[2] +    mime = normalize_mime(cdx[3]) +    http_status = cdx[4] +    key = cdx[5] +    c_size = cdx[8] +    offset = cdx[9] +    warc = cdx[10] + +    if not (key.isalnum() and c_size.isdigit() and offset.isdigit() +            and http_status == "200" and len(key) == 32 and dt.isdigit()): +        return None + +    if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): +        return None + +    key = "sha1:{}".format(key) + +    info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), +        offset=int(offset), warc=warc) + +    warc_file = warc.split('/')[-1] +    dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() +    try: +        dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() +    except: +        return None + +    # 'i' intentionally not set +    heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) +    return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} + + +class MRExtractCdxGrobid(MrJob): + +    # CDX lines in; JSON status out +    INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol +    OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + +    def configure_args(self): +        super(MRExtractCdxGrobid, self).configure_args() + +        self.add_passthru_arg('--hbase-table', +                              type=str, +                              default='wbgrp-journal-extract-0-qa', +                              help='HBase table to backfill into (must exist)') +        self.add_passthru_arg('--hbase-host', +                              type=str, +                              default='localhost', +                              help='HBase thrift API host to connect to') +        self.add_passthru_arg('--grobid-uri', +                              type=str, +                              default='http://localhost:8070', +                              help='URI of GROBID API Server') + +    def __init__(self, *args, **kwargs): + +        # Allow passthrough for tests +        if 'hb_table' in kwargs: +            self.hb_table = kwargs.pop('hb_table') +        else: +            self.hb_table = None + +        super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) +        self.mime_filter = ['application/pdf'] + +    def grobid_fulltext(self, content): +        r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", +            files={'input': content}) +        if r.status_code is not 200: +            # XXX: +            print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'))) +        else: +            # XXX: +            print("SUCCESS: " + debug_line) +        return r.json() + +    def mapper_init(self): + +        if self.hb_table is None: +            try: +                host = self.options.hbase_host +                # TODO: make these configs accessible from... mrconf.cfg? +                hb_conn = happybase.Connection(host=host, transport="framed", +                    protocol="compact") +            except Exception as err: +                raise Exception("Couldn't connect to HBase using host: {}".format(host)) +            self.hb_table = hb_conn.table(self.options.hbase_table) + +    def parse_line(self, raw_cdx): + +        if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or +                raw_cdx.startswith('#')): +            return None, dict(status="invalid", reason="line prefix") + +        info = parse_cdx_line(raw_cdx) +        if info is None: +            return None, dict(status="invalid", reason="CDX parse") + +        if info['file:mime'] not in self.mime_filter: +            self.increment_counter('lines', 'skip') +            return None, dict(status="skip", reason="mimetype") + +        # If warc is not item/file.(w)arc.gz form, skip it +        if len(info['file:cdx']['warc'].split('/')) != 2: +            self.increment_counter('lines', 'skip') +            return None, dict(status="skip", reason="WARC path not petabox item/file") + +        return info, None + +    def extract(self, info): + +        # Fetch data from WARCs in petabox +        try: +            rstore = ResourceStore(loaderfactory=CDXLoaderFactory())  +            gwb_record = rstore.load_resource( +                info['file:cdx']['warc'], +                info['file:cdx']['offset'], +                info['file:cdx']['c_size']) +        except IOError as ioe: +            # XXX: catch correct error +            self.increment_counter('lines', 'existing') +            return _, dict(status="existing") + +        if gwb_record.get_status()[0] != 200: +            self.increment_counter('lines', 'error') +            return _, dict(status="error", reason="non-HTTP-200 WARC content") + +        # Submit to GROBID +        content = gwb_record.open_raw_content() +        try: +            grobid_result = self.grobid_fulltext(gwb_record.open_raw_content()) +        except IOError as ioe: +            # XXX: catch correct error +            self.increment_counter('lines', 'existing') +            return _, dict(status="existing") + +        info['file:size'] = len(resource_data) + +        info['grobid0:status_code'] = None +        info['grobid0:quality'] = None +        info['grobid0:status'] = {} +        info['grobid0:tei_xml'] = None +        info['grobid0:tei_json'] = {} +        info['grobid0:metadata'] = {} + +        # Convert TEI XML to JSON +        # TODO + +        # Determine extraction "quality" +        # TODO + +        return info, None + +    def mapper(self, _, raw_cdx): +        """ +        1. parse CDX line +        2. check what is already in hbase +          3. fetch data from wayback +          4. submit to GROBID +            5. convert GROBID response to JSON (and metadata) +            6. determine "quality" +          7. push results to hbase +        """ + +        self.increment_counter('lines', 'total') + +        # Parse line and filter down +        info, status = self.parse_line(raw_cdx) +        if info is None: +            self.increment_counter('lines', status['status']) +            return _, status + +        # Check if we've already processed this line +        oldrow = self.hb_table.get(info['key'], columns=['f', 'file', +            'grobid:status_code']) +        if row.get('grobid0:status', None): +            # This file has already been processed; skip it +            self.increment_counter('lines', 'existing') +            return _, dict(status="existing") + +        # Do the extraction +        info, status = self.extract(info) +        if info is None: +            self.increment_counter('lines', status['status']) +            return _, status +         +        # Decide what to bother inserting back into HBase +        # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') +        grobid_status = info.get('grobid0:status_code', None) +        for k in info.keys(): +            if k in oldrow: +                info.pop(k) + +        # Convert fields to binary +        for k in info.keys(): +            if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', +                    'grobid0:metadata'): +                assert type(info[k]) == dict +                info[k] = json.dumps(info[k], sort_keys=True, indent=None) +            if k in ('file:size', 'grobid0:status_code'): +                # encode as int64 in network byte order +                info[k] = struct.pack('!q', info[k]) + +        key = info.pop('key') +        self.hb_table.put(key, info) +        self.increment_counter('lines', 'success') + +        yield _, dict(status="success", grobid_status=grobid_status) + +if __name__ == '__main__': +    MRExtractCdxGrobid.run() + diff --git a/extraction/tests/test_extraction_cdx_grobid.py b/extraction/tests/test_extraction_cdx_grobid.py new file mode 100644 index 0000000..71b55a3 --- /dev/null +++ b/extraction/tests/test_extraction_cdx_grobid.py @@ -0,0 +1,169 @@ + +import io +import json +import pytest +import mrjob +import responses +import happybase_mock +from extraction_cdx_grobid import MRExtractCDXGROBID + + + +def test_parse_cdx_line(): + +    raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" +    correct = { +        'key': "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G", +        'file:mime': "application/pdf", +        'file:cdx': { +            'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", +            'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", +            'dt': "20170828233154", +            'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", +            'offset': 931661233, +            'c_size': 210251, +        }, +        'f:c': { +            'u': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", +            'd': "2017-08-28T23:31:54", +            'f': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", +            'o': 931661233, +            'c': 1, +        } +    } + +    assert transform_line(raw) == correct +    assert transform_line(raw + "\n") == correct +    assert transform_line(raw + " extra_field") == correct + +@pytest.fixture +def job(): +    """ +    Note: this mock only seems to work with job.run_mapper(), not job.run(); +    the later results in a separate instantiation without the mock? +    """ +    conn = happybase_mock.Connection() +    conn.create_table('wbgrp-journal-extract-test', +        {'file': {}, 'grobid0': {}, 'f': {}}) +    table = conn.table('wbgrp-journal-extract-test') + +    job = MRCDXBackfillHBase(['--no-conf', '-'], hb_table=table) +    return job + + +@responses.activate +def test_mapper_lines(job): + +    fake_grobid = {} +    responses.add(responses.POST, 'http://localhost:9070/api/processFulltextDocument', status=200, +        body=json.dumps(fake_grobid), content_type='application/json') + +    raw = io.BytesIO(b""" +com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz +eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz +com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz +""") + +    job.sandbox(stdin=raw) +    job.run_mapper() + +    # wayback gets FETCH 1x times + +    # grobid gets POST 3x times + +    # hbase  + + +    assert job.hb_table.row(b'1') == {} +    # HTTP 301 +    assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {} +    # valid +    assert job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {} +    # text/plain +    assert job.hb_table.row(b'sha1:6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {} + +    row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') + +    assert struct.unpack("", row[b'file:size']) == 12345 +    assert row[b'file:mime'] == b"application/pdf" +    assert struct.unpack("", row[b'grobid0:status_code']) == 200 +    assert row[b'grobid0:quality'] == None # TODO +    status = json.loads(row[b'grobid0:status'].decode('utf-8')) +    assert type(row[b'grobid0:status']) == type(dict()) +    assert row[b'grobid0:tei_xml'] == "<xml><lorem>ipsum</lorem></xml>" +    tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) +    metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) + +def test_parse_cdx_invalid(job): + +    print("valid") +    raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.parse_line(raw) +    assert status is None + +    print("space-prefixed line") +    raw = io.BytesIO(b" com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.parse_line(raw) +    assert info is None +    assert status['status'] == "invalid" +    assert 'prefix' in status['reason'] + +    print("commented line") +    raw = io.BytesIO(b"#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.parse_line(raw) +    assert info is None +    assert status['status'] == "invalid" +    assert 'prefix' in status['reason'] + +    print("wrong column count") +    raw = io.BytesIO(b"a b c d") +    info, status = job.parse_line(raw) +    assert info is None +    assert status['status'] == "invalid" +    assert 'parse' in status['reason'] + +    print("missing mimetype") +    raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.parse_line(raw) +    assert info is None +    assert status['status'] == "invalid" +    assert 'parse' in status['reason'] + +    print("HTTP status") +    raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.parse_line(raw) +    assert info is None +    assert status['status'] == "invalid" + +    print("datetime") +    raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.parse_line(raw) +    assert info is None +    assert status['status'] == "invalid" + + +def test_parse_cdx_skip(job): + +    print("warc format") +    raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.mapper(raw) +    assert info is None +    assert status['status'] == "skip" +    assert 'WARC' in status['reason'] + +    print("mimetype") +    raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") +    info, status = job.mapper(raw) +    assert info is None +    assert status['status'] == "skip" +    assert 'mimetype' in status['reason'] + +def test_tei_json_convert(): +    # TODO: load xml test vector, run it through +    with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: +        xml_content = f.read() +    pass + +def test_tei_json_convert_invalid(): +    # TODO: pass in junk +    pass diff --git a/extraction/xml2json.py b/extraction/xml2json.py new file mode 100644 index 0000000..f956014 --- /dev/null +++ b/extraction/xml2json.py @@ -0,0 +1,8 @@ + +import json +import sys +import xmltodict + +with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'rb') as f: +    thing = xmltodict.parse(f, process_namespaces=False) +    print(json.dumps(thing)) | 
