diff options
Diffstat (limited to 'extraction/extraction_cdx_grobid.py')
-rwxr-xr-x | extraction/extraction_cdx_grobid.py | 248 |
1 files changed, 0 insertions, 248 deletions
diff --git a/extraction/extraction_cdx_grobid.py b/extraction/extraction_cdx_grobid.py deleted file mode 100755 index 27668ea..0000000 --- a/extraction/extraction_cdx_grobid.py +++ /dev/null @@ -1,248 +0,0 @@ -#!/usr/bin/env python3 -""" -Streaming Hadoop script to import extract metadata and body from fulltext (eg, -PDF) files using GROBID. Input is a CDX file; results primarly go to HBase, -with status written to configurable output stream. - -Fulltext files are loaded directly from WARC files in petabox, instead of going -through the wayback replay. - -Requires: -- happybase -- mrjob -- wayback/GWB libraries -""" - -import io -import sys -import struct -import requests -import happybase -import mrjob -from mrjob.job import MRJob -from wayback.resource import Resource -from wayback.resource import ArcResource -from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory - - -def parse_cdx_line(raw_cdx): - - cdx = raw_cdx.split() - if len(cdx) < 11: - return None - - surt = cdx[0] - dt = cdx[1] - url = cdx[2] - mime = normalize_mime(cdx[3]) - http_status = cdx[4] - key = cdx[5] - c_size = cdx[8] - offset = cdx[9] - warc = cdx[10] - - if not (key.isalnum() and c_size.isdigit() and offset.isdigit() - and http_status == "200" and len(key) == 32 and dt.isdigit()): - return None - - if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): - return None - - key = "sha1:{}".format(key) - - info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), - offset=int(offset), warc=warc) - - warc_file = warc.split('/')[-1] - dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - try: - dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - except: - return None - - # 'i' intentionally not set - heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) - return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} - - -class MRExtractCdxGrobid(MrJob): - - # CDX lines in; JSON status out - INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol - OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol - - def configure_args(self): - super(MRExtractCdxGrobid, self).configure_args() - - self.add_passthru_arg('--hbase-table', - type=str, - default='wbgrp-journal-extract-0-qa', - help='HBase table to backfill into (must exist)') - self.add_passthru_arg('--hbase-host', - type=str, - default='localhost', - help='HBase thrift API host to connect to') - self.add_passthru_arg('--grobid-uri', - type=str, - default='http://localhost:8070', - help='URI of GROBID API Server') - - def __init__(self, *args, **kwargs): - - # Allow passthrough for tests - if 'hb_table' in kwargs: - self.hb_table = kwargs.pop('hb_table') - else: - self.hb_table = None - - super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) - self.mime_filter = ['application/pdf'] - - def grobid_fulltext(self, content): - r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", - files={'input': content}) - if r.status_code is not 200: - # XXX: - print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'))) - else: - # XXX: - print("SUCCESS: " + debug_line) - return r.json() - - def mapper_init(self): - - if self.hb_table is None: - try: - host = self.options.hbase_host - # TODO: make these configs accessible from... mrconf.cfg? - hb_conn = happybase.Connection(host=host, transport="framed", - protocol="compact") - except Exception as err: - raise Exception("Couldn't connect to HBase using host: {}".format(host)) - self.hb_table = hb_conn.table(self.options.hbase_table) - - def parse_line(self, raw_cdx): - - if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or - raw_cdx.startswith('#')): - return None, dict(status="invalid", reason="line prefix") - - info = parse_cdx_line(raw_cdx) - if info is None: - return None, dict(status="invalid", reason="CDX parse") - - if info['file:mime'] not in self.mime_filter: - self.increment_counter('lines', 'skip') - return None, dict(status="skip", reason="mimetype") - - # If warc is not item/file.(w)arc.gz form, skip it - if len(info['file:cdx']['warc'].split('/')) != 2: - self.increment_counter('lines', 'skip') - return None, dict(status="skip", reason="WARC path not petabox item/file") - - return info, None - - def extract(self, info): - - # Fetch data from WARCs in petabox - try: - rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - gwb_record = rstore.load_resource( - info['file:cdx']['warc'], - info['file:cdx']['offset'], - info['file:cdx']['c_size']) - except IOError as ioe: - # XXX: catch correct error - self.increment_counter('lines', 'existing') - return _, dict(status="existing") - - if gwb_record.get_status()[0] != 200: - self.increment_counter('lines', 'error') - return _, dict(status="error", reason="non-HTTP-200 WARC content") - - # Submit to GROBID - content = gwb_record.open_raw_content() - try: - grobid_result = self.grobid_fulltext(gwb_record.open_raw_content()) - except IOError as ioe: - # XXX: catch correct error - self.increment_counter('lines', 'existing') - return _, dict(status="existing") - - info['file:size'] = len(resource_data) - - info['grobid0:status_code'] = None - info['grobid0:quality'] = None - info['grobid0:status'] = {} - info['grobid0:tei_xml'] = None - info['grobid0:tei_json'] = {} - info['grobid0:metadata'] = {} - - # Convert TEI XML to JSON - # TODO - - # Determine extraction "quality" - # TODO - - return info, None - - def mapper(self, _, raw_cdx): - """ - 1. parse CDX line - 2. check what is already in hbase - 3. fetch data from wayback - 4. submit to GROBID - 5. convert GROBID response to JSON (and metadata) - 6. determine "quality" - 7. push results to hbase - """ - - self.increment_counter('lines', 'total') - - # Parse line and filter down - info, status = self.parse_line(raw_cdx) - if info is None: - self.increment_counter('lines', status['status']) - return _, status - - # Check if we've already processed this line - oldrow = self.hb_table.get(info['key'], columns=['f', 'file', - 'grobid:status_code']) - if row.get('grobid0:status', None): - # This file has already been processed; skip it - self.increment_counter('lines', 'existing') - return _, dict(status="existing") - - # Do the extraction - info, status = self.extract(info) - if info is None: - self.increment_counter('lines', status['status']) - return _, status - - # Decide what to bother inserting back into HBase - # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') - grobid_status = info.get('grobid0:status_code', None) - for k in info.keys(): - if k in oldrow: - info.pop(k) - - # Convert fields to binary - for k in info.keys(): - if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', - 'grobid0:metadata'): - assert type(info[k]) == dict - info[k] = json.dumps(info[k], sort_keys=True, indent=None) - if k in ('file:size', 'grobid0:status_code'): - # encode as int64 in network byte order - info[k] = struct.pack('!q', info[k]) - - key = info.pop('key') - self.hb_table.put(key, info) - self.increment_counter('lines', 'success') - - yield _, dict(status="success", grobid_status=grobid_status) - -if __name__ == '__main__': - MRExtractCdxGrobid.run() - |