aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce/extraction_cdx_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py87
1 files changed, 50 insertions, 37 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index ea36e6e..0ba95e6 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -15,16 +15,19 @@ Requires:
import io
import sys
+import json
import struct
import requests
import happybase
import mrjob
from mrjob.job import MRJob
+import wayback.exception
from wayback.resource import Resource
from wayback.resource import ArcResource
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
from common import parse_cdx_line
+from grobid2json import do_tei
class MRExtractCdxGrobid(MRJob):
@@ -60,16 +63,13 @@ class MRExtractCdxGrobid(MRJob):
super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
self.mime_filter = ['application/pdf']
- def grobid_fulltext(self, content):
+ def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
files={'input': content})
if r.status_code is not 200:
# XXX:
- print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8')))
- else:
- # XXX:
- print("SUCCESS: " + debug_line)
- return r.json()
+ return None
+ return r
def mapper_init(self):
@@ -104,47 +104,56 @@ class MRExtractCdxGrobid(MRJob):
return info, None
- def extract(self, info):
-
- # Fetch data from WARCs in petabox
+ def fetch_warc_content(self, warc_path, offset, c_size):
try:
rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- gwb_record = rstore.load_resource(
- info['file:cdx']['warc'],
- info['file:cdx']['offset'],
- info['file:cdx']['c_size'])
- except IOError as ioe:
- # XXX: catch correct error
- self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
+ gwb_record = rstore.load_resource(warc_path, offset, c_size)
+ except wayback.exception.ResourceUnavailable as err:
+ # XXX: during testing
+ raise err
+ self.increment_counter('lines', 'petabox_error')
+ return None, dict(status="petabox_error", reason="failed to load file contents")
if gwb_record.get_status()[0] != 200:
self.increment_counter('lines', 'error')
- return _, dict(status="error", reason="non-HTTP-200 WARC content")
+ return None, dict(status="error", reason="non-HTTP-200 WARC content")
+ return gwb_record.open_raw_content()
+
+ def extract(self, info):
+
+ # Fetch data from WARCs in petabox
+ content, status = self.fetch_warc_content(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ if status:
+ self.increment_counter('lines', status['status'])
+ return None, status
+
+ info['file:size'] = len(content)
# Submit to GROBID
- content = gwb_record.open_raw_content()
try:
- grobid_result = self.grobid_fulltext(gwb_record.open_raw_content())
+ grobid_response = self.grobid_process_fulltext(content)
except IOError as ioe:
+ raise ioe
# XXX: catch correct error
- self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
-
- info['file:size'] = len(resource_data)
+ self.increment_counter('lines', 'fail')
+ return None, dict(status="fail", reason="GROBID connection")
- info['grobid0:status_code'] = None
- info['grobid0:quality'] = None
- info['grobid0:status'] = {}
- info['grobid0:tei_xml'] = None
- info['grobid0:tei_json'] = {}
- info['grobid0:metadata'] = {}
+ info['grobid0:status_code'] = grobid_response.status_code
+ info['grobid0:tei_xml'] = grobid_response.content
+ info['grobid0:status'] = {} # TODO
# Convert TEI XML to JSON
- # TODO
+ # TODO:
+ info['grobid0:tei_json'] = do_tei(grobid_response.content, encumbered=True)
+ info['grobid0:metadata'] = do_tei(grobid_response.content, encumbered=False)
# Determine extraction "quality"
- # TODO
+ # TODO:
+
+ info['grobid0:quality'] = None
return info, None
@@ -187,19 +196,22 @@ class MRExtractCdxGrobid(MRJob):
# Decide what to bother inserting back into HBase
# Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
grobid_status = info.get('grobid0:status_code', None)
- for k in info.keys():
+ for k in list(info.keys()):
if k in oldrow:
info.pop(k)
# Convert fields to binary
- for k in info.keys():
- if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ for k in list(info.keys()):
+ if info[k] == None:
+ info.pop(k)
+ elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
'grobid0:metadata'):
assert type(info[k]) == dict
info[k] = json.dumps(info[k], sort_keys=True, indent=None)
- if k in ('file:size', 'grobid0:status_code'):
+ elif k in ('file:size', 'grobid0:status_code'):
# encode as int64 in network byte order
- info[k] = struct.pack('!q', info[k])
+ if info[k] != {} and info[k] != None:
+ info[k] = struct.pack('!q', info[k])
key = info.pop('key')
self.hb_table.put(key, info)
@@ -207,6 +219,7 @@ class MRExtractCdxGrobid(MRJob):
yield _, dict(status="success", grobid_status=grobid_status)
+
if __name__ == '__main__': # pragma: no cover
MRExtractCdxGrobid.run()