diff options
-rwxr-xr-x | python/extraction_ungrobided.py | 135 | ||||
-rw-r--r-- | python/tests/test_extraction_ungrobided.py | 8 |
2 files changed, 136 insertions, 7 deletions
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py index ef98eff..8224dbb 100755 --- a/python/extraction_ungrobided.py +++ b/python/extraction_ungrobided.py @@ -21,13 +21,21 @@ import xml import json import raven import struct +import requests +import happybase import mrjob +from mrjob.job import MRJob +import wayback.exception +from wayback.resource import Resource +from wayback.resource import ArcResource +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory from common import parse_ungrobided_line -from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \ - sentry_client +from grobid2json import teixml2json +from extraction_cdx_grobid import KEY_BLACKLIST, sentry_client -class MRExtractUnGrobided(MRExtractCdxGrobid): +class MRExtractUnGrobided(MRJob): # "ungrobided" TSV lines in; JSON status out #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat' @@ -35,6 +43,54 @@ class MRExtractUnGrobided(MRExtractCdxGrobid): INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + def configure_args(self): + super(MRExtractUnGrobided, self).configure_args() + + self.add_passthru_arg('--hbase-table', + type=str, + default='wbgrp-journal-extract-0-qa', + help='HBase table to backfill into (must exist)') + self.add_passthru_arg('--hbase-host', + type=str, + default='localhost', + help='HBase thrift API host to connect to') + self.add_passthru_arg('--grobid-uri', + type=str, + default='http://localhost:8070', + help='URI of GROBID API Server') + self.add_passthru_arg('--warc-uri-prefix', + type=str, + default='https://archive.org/serve/', + help='URI where WARCs can be found') + self.add_passthru_arg('--force-existing', + action="store_true", + help='Re-processes (with GROBID) existing lines') + + def __init__(self, *args, **kwargs): + super(MRExtractUnGrobided, self).__init__(*args, **kwargs) + self.mime_filter = ['application/pdf'] + self.hb_table = None + + def grobid_process_fulltext(self, content): + r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", + files={'input': content}) + return r + + def mapper_init(self): + + if self.hb_table: + return + + sentry_client.tags_context(dict(hbase_table=self.options.hbase_table)) + try: + host = self.options.hbase_host + # TODO: make these configs accessible from... mrconf.cfg? + hb_conn = happybase.Connection(host=host, transport="framed", + protocol="compact") + except Exception: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + def parse_ungrobided_line(self, raw_line): """Line should be TSV and have non-null fields: @@ -60,6 +116,79 @@ class MRExtractUnGrobided(MRExtractCdxGrobid): return info, None + def fetch_warc_content(self, warc_path, offset, c_size): + warc_uri = self.options.warc_uri_prefix + warc_path + try: + rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) + gwb_record = rstore.load_resource(warc_uri, offset, c_size) + except wayback.exception.ResourceUnavailable: + return None, dict(status="error", + reason="failed to load file contents from wayback/petabox") + + if gwb_record.get_status()[0] != 200: + return None, dict(status="error", + reason="archived HTTP response (WARC) was not 200", + warc_status=gwb_record.get_status()[0]) + return gwb_record.open_raw_content().read(), None + + def extract(self, info): + + # Fetch data from WARCs in petabox + original_content, status = self.fetch_warc_content( + info['file:cdx']['warc'], + info['file:cdx']['offset'], + info['file:cdx']['c_size']) + if status: + return None, status + + info['file:size'] = len(original_content) + + # Submit to GROBID + try: + grobid_response = self.grobid_process_fulltext(original_content) + except requests.exceptions.ConnectionError: + return None, dict(status="error", reason="connection to GROBID worker") + + info['grobid0:status_code'] = grobid_response.status_code + + # 4 MByte XML size limit; don't record GROBID status on this path + if len(grobid_response.content) > 4000000: + info['grobid0:status'] = {'status': 'oversize'} + return info, dict(status="oversize", reason="TEI response was too large") + + if grobid_response.status_code != 200: + # response.text is .content decoded as utf-8 + info['grobid0:status'] = dict(status='error', description=grobid_response.text) + return info, dict(status="error", reason="non-200 GROBID HTTP status", + extra=grobid_response.text) + + info['grobid0:status'] = {'status': 'partial'} + info['grobid0:tei_xml'] = grobid_response.content + + # Convert TEI XML to JSON + try: + info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) + except xml.etree.ElementTree.ParseError: + info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") + return info, info['grobid0:status'] + except ValueError: + info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") + return info, info['grobid0:status'] + + tei_metadata = info['grobid0:tei_json'].copy() + for k in ('body', 'annex'): + # Remove fulltext (copywritted) content + tei_metadata.pop(k, None) + info['grobid0:metadata'] = tei_metadata + + # Determine extraction "quality" + # TODO: + + info['grobid0:quality'] = None + info['grobid0:status'] = {'status': 'success'} + + return info, None + @sentry_client.capture_exceptions def mapper(self, _, raw_line): """ diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py index 0ec47bc..366d392 100644 --- a/python/tests/test_extraction_ungrobided.py +++ b/python/tests/test_extraction_ungrobided.py @@ -139,28 +139,28 @@ def test_parse_ungrobided_invalid(job): print("space-prefixed line") raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) + info, status = job.parse_ungrobided_line(raw) assert info is None assert status['status'] == "invalid" assert 'prefix' in status['reason'] print("commented line") raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) + info, status = job.parse_ungrobided_line(raw) assert info is None assert status['status'] == "invalid" assert 'prefix' in status['reason'] print("wrong column count") raw = "a b c d e" - info, status = job.parse_line(raw) + info, status = job.parse_ungrobided_line(raw) assert info is None assert status['status'] == "invalid" assert 'parse' in status['reason'] print("CDX line, somehow") raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) + info, status = job.parse_ungrobided_line(raw) assert info is None print(status) assert status['status'] == "invalid" |