aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/extraction_ungrobided.py135
-rw-r--r--python/tests/test_extraction_ungrobided.py8
2 files changed, 136 insertions, 7 deletions
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py
index ef98eff..8224dbb 100755
--- a/python/extraction_ungrobided.py
+++ b/python/extraction_ungrobided.py
@@ -21,13 +21,21 @@ import xml
import json
import raven
import struct
+import requests
+import happybase
import mrjob
+from mrjob.job import MRJob
+import wayback.exception
+from wayback.resource import Resource
+from wayback.resource import ArcResource
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
from common import parse_ungrobided_line
-from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \
- sentry_client
+from grobid2json import teixml2json
+from extraction_cdx_grobid import KEY_BLACKLIST, sentry_client
-class MRExtractUnGrobided(MRExtractCdxGrobid):
+class MRExtractUnGrobided(MRJob):
# "ungrobided" TSV lines in; JSON status out
#HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
@@ -35,6 +43,54 @@ class MRExtractUnGrobided(MRExtractCdxGrobid):
INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+ def configure_args(self):
+ super(MRExtractUnGrobided, self).configure_args()
+
+ self.add_passthru_arg('--hbase-table',
+ type=str,
+ default='wbgrp-journal-extract-0-qa',
+ help='HBase table to backfill into (must exist)')
+ self.add_passthru_arg('--hbase-host',
+ type=str,
+ default='localhost',
+ help='HBase thrift API host to connect to')
+ self.add_passthru_arg('--grobid-uri',
+ type=str,
+ default='http://localhost:8070',
+ help='URI of GROBID API Server')
+ self.add_passthru_arg('--warc-uri-prefix',
+ type=str,
+ default='https://archive.org/serve/',
+ help='URI where WARCs can be found')
+ self.add_passthru_arg('--force-existing',
+ action="store_true",
+ help='Re-processes (with GROBID) existing lines')
+
+ def __init__(self, *args, **kwargs):
+ super(MRExtractUnGrobided, self).__init__(*args, **kwargs)
+ self.mime_filter = ['application/pdf']
+ self.hb_table = None
+
+ def grobid_process_fulltext(self, content):
+ r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
+ files={'input': content})
+ return r
+
+ def mapper_init(self):
+
+ if self.hb_table:
+ return
+
+ sentry_client.tags_context(dict(hbase_table=self.options.hbase_table))
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+
def parse_ungrobided_line(self, raw_line):
"""Line should be TSV and have non-null fields:
@@ -60,6 +116,79 @@ class MRExtractUnGrobided(MRExtractCdxGrobid):
return info, None
+ def fetch_warc_content(self, warc_path, offset, c_size):
+ warc_uri = self.options.warc_uri_prefix + warc_path
+ try:
+ rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
+ gwb_record = rstore.load_resource(warc_uri, offset, c_size)
+ except wayback.exception.ResourceUnavailable:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox")
+
+ if gwb_record.get_status()[0] != 200:
+ return None, dict(status="error",
+ reason="archived HTTP response (WARC) was not 200",
+ warc_status=gwb_record.get_status()[0])
+ return gwb_record.open_raw_content().read(), None
+
+ def extract(self, info):
+
+ # Fetch data from WARCs in petabox
+ original_content, status = self.fetch_warc_content(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ if status:
+ return None, status
+
+ info['file:size'] = len(original_content)
+
+ # Submit to GROBID
+ try:
+ grobid_response = self.grobid_process_fulltext(original_content)
+ except requests.exceptions.ConnectionError:
+ return None, dict(status="error", reason="connection to GROBID worker")
+
+ info['grobid0:status_code'] = grobid_response.status_code
+
+ # 4 MByte XML size limit; don't record GROBID status on this path
+ if len(grobid_response.content) > 4000000:
+ info['grobid0:status'] = {'status': 'oversize'}
+ return info, dict(status="oversize", reason="TEI response was too large")
+
+ if grobid_response.status_code != 200:
+ # response.text is .content decoded as utf-8
+ info['grobid0:status'] = dict(status='error', description=grobid_response.text)
+ return info, dict(status="error", reason="non-200 GROBID HTTP status",
+ extra=grobid_response.text)
+
+ info['grobid0:status'] = {'status': 'partial'}
+ info['grobid0:tei_xml'] = grobid_response.content
+
+ # Convert TEI XML to JSON
+ try:
+ info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
+ except xml.etree.ElementTree.ParseError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
+ return info, info['grobid0:status']
+ except ValueError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
+ return info, info['grobid0:status']
+
+ tei_metadata = info['grobid0:tei_json'].copy()
+ for k in ('body', 'annex'):
+ # Remove fulltext (copywritted) content
+ tei_metadata.pop(k, None)
+ info['grobid0:metadata'] = tei_metadata
+
+ # Determine extraction "quality"
+ # TODO:
+
+ info['grobid0:quality'] = None
+ info['grobid0:status'] = {'status': 'success'}
+
+ return info, None
+
@sentry_client.capture_exceptions
def mapper(self, _, raw_line):
"""
diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py
index 0ec47bc..366d392 100644
--- a/python/tests/test_extraction_ungrobided.py
+++ b/python/tests/test_extraction_ungrobided.py
@@ -139,28 +139,28 @@ def test_parse_ungrobided_invalid(job):
print("space-prefixed line")
raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
+ info, status = job.parse_ungrobided_line(raw)
assert info is None
assert status['status'] == "invalid"
assert 'prefix' in status['reason']
print("commented line")
raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
+ info, status = job.parse_ungrobided_line(raw)
assert info is None
assert status['status'] == "invalid"
assert 'prefix' in status['reason']
print("wrong column count")
raw = "a b c d e"
- info, status = job.parse_line(raw)
+ info, status = job.parse_ungrobided_line(raw)
assert info is None
assert status['status'] == "invalid"
assert 'parse' in status['reason']
print("CDX line, somehow")
raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
+ info, status = job.parse_ungrobided_line(raw)
assert info is None
print(status)
assert status['status'] == "invalid"