diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/common.py | 26 | ||||
-rwxr-xr-x | python/extraction_ungrobided.py | 136 | ||||
-rw-r--r-- | python/tests/test_extraction_ungrobided.py | 126 |
3 files changed, 288 insertions, 0 deletions
diff --git a/python/common.py b/python/common.py index 6710044..e596b35 100644 --- a/python/common.py +++ b/python/common.py @@ -1,4 +1,5 @@ +import json from datetime import datetime NORMAL_MIME = ( @@ -71,3 +72,28 @@ def parse_cdx_line(raw_cdx): # 'i' intentionally not set heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} + +def parse_ungrobided_line(raw_line): + + line = raw_line.strip().split("\t") + if len(line) != 4: + return None + + key = line[0] + mime = normalize_mime(line[2]) + try: + f_c = json.loads(line[1]) + cdx = json.loads(line[3]) + except json.JSONDecodeError: + return None + + if not (key[5:].isalnum() and len(key) == 37 and mime != None): + print(mime) + print(key) + print("FAIL") + return None + + if '-' in (key, mime, f_c, cdx): + return None + + return {'key': key, 'file:mime': mime, 'file:cdx': cdx, 'f:c': f_c} diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py new file mode 100755 index 0000000..aedf715 --- /dev/null +++ b/python/extraction_ungrobided.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +Variant of extraction_cdx_grobid which takes a partial metadata list as input +instead of CDX. + +This task list is dumped by another Hadoop job which scans over the HBase table +quickly, which allows this job to skip a (relatively) expensive HBase read +per-row. + +Requires: +- happybase +- mrjob +- wayback/GWB libraries +""" + +# XXX: some broken MRO thing going on in here due to python3 object wrangling +# in `wayback` library. Means we can't run pylint. +# pylint: skip-file + +import xml +import json +import raven +import struct +import mrjob +from common import parse_ungrobided_line +from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \ + sentry_client + + +class MRExtractUnGrobided(MRExtractCdxGrobid): + + # CDX lines in; JSON status out + #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat' + #INPUT_PROTOCOL = mrjob.protocol.RawProtocol + INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol + OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + + def parse_line(self, raw_line): + """Line should be TSV and have non-null fields: + + - key (string) + - f:c (string, json) + - file:mime (string) + - file:cdx (string, json) + """ + + if (raw_line.startswith(' ') or raw_line.startswith('#')): + return None, dict(status="invalid", reason="line prefix", input=raw_line) + + info = parse_ungrobided_line(raw_line) + if info is None: + return None, dict(status="invalid", reason="ungrobided parse") + + if info['file:mime'] not in self.mime_filter: + return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime']) + + # If warc is not item/file.(w)arc.gz form, skip it + if len(info['file:cdx']['warc'].split('/')) != 2: + return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc']) + + return info, None + + @sentry_client.capture_exceptions + def mapper(self, _, raw_line): + """ + 1. parse filtered line + 2. fetch data from wayback + 3. submit to GROBID + 4. convert GROBID response to JSON (and metadata) + 6. determine "quality" + 6. push results to hbase + """ + + self.increment_counter('lines', 'total') + + # Parse line and filter down + info, status = self.parse_line(raw_line) + if info is None: + self.increment_counter('lines', status['status']) + yield _, status + return + key = info['key'] + if key in KEY_BLACKLIST: + self.increment_counter('lines', 'blacklist') + yield _, dict(status='blacklist', key=key) + return + + # Note: this may not get "cleared" correctly + sentry_client.extra_context(dict(row_key=key)) + + # Do the extraction + info, status = self.extract(info) + if info is None: + self.increment_counter('lines', status['status']) + status['key'] = key + yield _, status + return + extraction_status = status + + # Decide what to bother inserting back into HBase + # Basically, don't overwrite backfill fields. + grobid_status_code = info.get('grobid0:status_code', None) + for k in list(info.keys()): + if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'): + info.pop(k) + + # Convert fields to binary + for k in list(info.keys()): + if info[k] is None: + info.pop(k) + # NOTE: we're not actually sending these f:*, file:* keys... + elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', + 'grobid0:metadata'): + assert type(info[k]) == dict + info[k] = json.dumps(info[k], sort_keys=True, indent=None) + elif k in ('file:size', 'grobid0:status_code'): + # encode as int64 in network byte order + if info[k] != {} and info[k] != None: + info[k] = struct.pack('!q', info[k]) + + key = info.pop('key') + self.hb_table.put(key, info) + self.increment_counter('lines', 'success') + + if extraction_status is not None: + yield _, dict(status="partial", key=key, + grobid_status_code=grobid_status_code, + reason=extraction_status['reason']) + else: + yield _, dict(status="success", + grobid_status_code=grobid_status_code, key=key, + extra=extraction_status) + + +if __name__ == '__main__': # pragma: no cover + MRExtractCdxGrobid.run() diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py new file mode 100644 index 0000000..fb681c2 --- /dev/null +++ b/python/tests/test_extraction_ungrobided.py @@ -0,0 +1,126 @@ + +import io +import json +import mrjob +import pytest +import struct +import responses +import happybase_mock +import wayback.exception +from unittest import mock +from common import parse_ungrobided_line +from extraction_ungrobided import MRExtractUnGrobided + + +FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) +OK_UNGROBIDED_LINE = b"\t".join(( + b"sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ", + b"""{"c": 1, "d": "2017-07-06T07:54:11", "f": "CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", "o": 914718776, "u": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"}""", + b"application/pdf", + b"""{"c_size": 501, "dt": "20170706075411", "offset": 914718776, "surt": "org,ibc7)/article/file_down.php?mode=article_print&pid=250", "url": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250", "warc": "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz"}""", +)) + +with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: + REAL_TEI_XML = f.read() + +@pytest.fixture +def job(): + """ + Note: this mock only seems to work with job.run_mapper(), not job.run(); + the later results in a separate instantiation without the mock? + """ + job = MRExtractUnGrobided(['--no-conf', '-']) + + conn = happybase_mock.Connection() + conn.create_table('wbgrp-journal-extract-test', + {'file': {}, 'grobid0': {}, 'f': {}}) + job.hb_table = conn.table('wbgrp-journal-extract-test') + + return job + + +@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_lines(mock_fetch, job): + + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=REAL_TEI_XML, content_type='text/xml') + + raw = io.BytesIO(OK_UNGROBIDED_LINE) + + output = io.BytesIO() + job.sandbox(stdin=raw, stdout=output) + + job.run_mapper() + + # for debugging tests + #print(output.getvalue().decode('utf-8')) + #print(list(job.hb_table.scan())) + + # wayback gets FETCH 1x times + mock_fetch.assert_called_once_with( + "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", + 914718776, + 501) + + # grobid gets POST 1x times + assert len(responses.calls) == 1 + + # HBase + assert job.hb_table.row(b'1') == {} + + # Saved extraction info + row = job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') + + assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES) + assert row[b'file:mime'] == b"application/pdf" + assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 + # TODO: assert row[b'grobid0:quality'] == None + status = json.loads(row[b'grobid0:status'].decode('utf-8')) + assert type(status) == type(dict()) + assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML + tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) + metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) + assert tei_json['title'] == metadata['title'] + assert 'body' in tei_json + assert 'body' not in metadata + +def test_parse_ungrobided_invalid(job): + + print("space-prefixed line") + raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'prefix' in status['reason'] + + print("commented line") + raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'prefix' in status['reason'] + + print("wrong column count") + raw = "a b c d e" + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'parse' in status['reason'] + + print("CDX line, somehow") + raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" + info, status = job.parse_line(raw) + assert info is None + print(status) + assert status['status'] == "invalid" + assert 'parse' in status['reason'] + +def test_parse_ungrobided_valid(): + + parsed = parse_ungrobided_line(OK_UNGROBIDED_LINE.decode('utf-8')) + assert parsed['key'] == "sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ" + assert parsed['f:c']['u'] == "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250" + assert parsed['file:mime'] == "application/pdf" + assert parsed['file:cdx']['c_size'] == 501 + assert parsed['file:cdx']['dt'] == "20170706075411" |