aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce/extraction_cdx_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py21
1 files changed, 14 insertions, 7 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index 8fa1720..bcb8663 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -54,6 +54,10 @@ class MRExtractCdxGrobid(MRJob):
type=str,
default='http://localhost:8070',
help='URI of GROBID API Server')
+ self.add_passthru_arg('--warc-uri-prefix',
+ type=str,
+ default='https://archive.org/serve/',
+ help='URI where WARCs can be found')
def __init__(self, *args, **kwargs):
super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
@@ -105,9 +109,10 @@ class MRExtractCdxGrobid(MRJob):
return info, None
def fetch_warc_content(self, warc_path, offset, c_size):
+ warc_uri = self.options.warc_uri_prefix + warc_path
try:
rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- gwb_record = rstore.load_resource(warc_path, offset, c_size)
+ gwb_record = rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable as err:
# XXX: during testing
raise err
@@ -116,13 +121,15 @@ class MRExtractCdxGrobid(MRJob):
if gwb_record.get_status()[0] != 200:
self.increment_counter('lines', 'error')
- return None, dict(status="error", reason="non-HTTP-200 WARC content")
- return gwb_record.open_raw_content()
+ return None, dict(status="error",
+ reason="archived HTTP response (WARC) was not 200",
+ warc_status=gwb_record.get_status()[0])
+ return gwb_record.open_raw_content().read(), None
def extract(self, info):
# Fetch data from WARCs in petabox
- content, status = self.fetch_warc_content(
+ original_content, status = self.fetch_warc_content(
info['file:cdx']['warc'],
info['file:cdx']['offset'],
info['file:cdx']['c_size'])
@@ -130,11 +137,11 @@ class MRExtractCdxGrobid(MRJob):
self.increment_counter('lines', status['status'])
return None, status
- info['file:size'] = len(content)
+ info['file:size'] = len(original_content)
# Submit to GROBID
try:
- grobid_response = self.grobid_process_fulltext(content)
+ grobid_response = self.grobid_process_fulltext(original_content)
except IOError as ioe:
raise ioe
# XXX: catch correct error
@@ -179,7 +186,7 @@ class MRExtractCdxGrobid(MRJob):
# Check if we've already processed this line
oldrow = self.hb_table.row(info['key'], columns=['f', 'file',
- 'grobid:status_code'])
+ 'grobid0:status_code'])
if oldrow.get('grobid0:status', None):
# This file has already been processed; skip it
self.increment_counter('lines', 'existing')