diff options
Diffstat (limited to 'mapreduce')
| -rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 21 | 
1 files changed, 14 insertions, 7 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index 8fa1720..bcb8663 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -54,6 +54,10 @@ class MRExtractCdxGrobid(MRJob):                                type=str,                                default='http://localhost:8070',                                help='URI of GROBID API Server') +        self.add_passthru_arg('--warc-uri-prefix', +                              type=str, +                              default='https://archive.org/serve/', +                              help='URI where WARCs can be found')      def __init__(self, *args, **kwargs):          super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) @@ -105,9 +109,10 @@ class MRExtractCdxGrobid(MRJob):          return info, None      def fetch_warc_content(self, warc_path, offset, c_size): +        warc_uri = self.options.warc_uri_prefix + warc_path          try:              rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) -            gwb_record = rstore.load_resource(warc_path, offset, c_size) +            gwb_record = rstore.load_resource(warc_uri, offset, c_size)          except wayback.exception.ResourceUnavailable as err:              # XXX: during testing              raise err @@ -116,13 +121,15 @@ class MRExtractCdxGrobid(MRJob):          if gwb_record.get_status()[0] != 200:              self.increment_counter('lines', 'error') -            return None, dict(status="error", reason="non-HTTP-200 WARC content") -        return gwb_record.open_raw_content() +            return None, dict(status="error", +                reason="archived HTTP response (WARC) was not 200", +                warc_status=gwb_record.get_status()[0]) +        return gwb_record.open_raw_content().read(), None      def extract(self, info):          # Fetch data from WARCs in petabox -        content, status = self.fetch_warc_content( +        original_content, status = self.fetch_warc_content(              info['file:cdx']['warc'],              info['file:cdx']['offset'],              info['file:cdx']['c_size']) @@ -130,11 +137,11 @@ class MRExtractCdxGrobid(MRJob):              self.increment_counter('lines', status['status'])              return None, status -        info['file:size'] = len(content) +        info['file:size'] = len(original_content)          # Submit to GROBID          try: -            grobid_response = self.grobid_process_fulltext(content) +            grobid_response = self.grobid_process_fulltext(original_content)          except IOError as ioe:              raise ioe              # XXX: catch correct error @@ -179,7 +186,7 @@ class MRExtractCdxGrobid(MRJob):          # Check if we've already processed this line          oldrow = self.hb_table.row(info['key'], columns=['f', 'file', -            'grobid:status_code']) +            'grobid0:status_code'])          if oldrow.get('grobid0:status', None):              # This file has already been processed; skip it              self.increment_counter('lines', 'existing')  | 
