diff options
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index 8fa1720..bcb8663 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -54,6 +54,10 @@ class MRExtractCdxGrobid(MRJob): type=str, default='http://localhost:8070', help='URI of GROBID API Server') + self.add_passthru_arg('--warc-uri-prefix', + type=str, + default='https://archive.org/serve/', + help='URI where WARCs can be found') def __init__(self, *args, **kwargs): super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) @@ -105,9 +109,10 @@ class MRExtractCdxGrobid(MRJob): return info, None def fetch_warc_content(self, warc_path, offset, c_size): + warc_uri = self.options.warc_uri_prefix + warc_path try: rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - gwb_record = rstore.load_resource(warc_path, offset, c_size) + gwb_record = rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable as err: # XXX: during testing raise err @@ -116,13 +121,15 @@ class MRExtractCdxGrobid(MRJob): if gwb_record.get_status()[0] != 200: self.increment_counter('lines', 'error') - return None, dict(status="error", reason="non-HTTP-200 WARC content") - return gwb_record.open_raw_content() + return None, dict(status="error", + reason="archived HTTP response (WARC) was not 200", + warc_status=gwb_record.get_status()[0]) + return gwb_record.open_raw_content().read(), None def extract(self, info): # Fetch data from WARCs in petabox - content, status = self.fetch_warc_content( + original_content, status = self.fetch_warc_content( info['file:cdx']['warc'], info['file:cdx']['offset'], info['file:cdx']['c_size']) @@ -130,11 +137,11 @@ class MRExtractCdxGrobid(MRJob): self.increment_counter('lines', status['status']) return None, status - info['file:size'] = len(content) + info['file:size'] = len(original_content) # Submit to GROBID try: - grobid_response = self.grobid_process_fulltext(content) + grobid_response = self.grobid_process_fulltext(original_content) except IOError as ioe: raise ioe # XXX: catch correct error @@ -179,7 +186,7 @@ class MRExtractCdxGrobid(MRJob): # Check if we've already processed this line oldrow = self.hb_table.row(info['key'], columns=['f', 'file', - 'grobid:status_code']) + 'grobid0:status_code']) if oldrow.get('grobid0:status', None): # This file has already been processed; skip it self.increment_counter('lines', 'existing') |