diff options
author | Bryan Newbold <bnewbold@archive.org> | 2019-02-21 11:55:16 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2019-02-21 11:55:20 -0800 |
commit | 4225fe89836b72e771e612139d0f5561088a6909 (patch) | |
tree | 3a6a5a24735e1b4212419bdca01a96392be5130e /python/extraction_ungrobided.py | |
parent | f59e9895d3c9d198538b40e36d3b0cc3b4bb5b92 (diff) | |
download | sandcrawler-4225fe89836b72e771e612139d0f5561088a6909.tar.gz sandcrawler-4225fe89836b72e771e612139d0f5561088a6909.zip |
backport GWB fetch improvements to extraction/kafka workers
*Really* need to refactor out these common methods into a base class.
Diffstat (limited to 'python/extraction_ungrobided.py')
-rwxr-xr-x | python/extraction_ungrobided.py | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py index 4b558dd..99d4f13 100755 --- a/python/extraction_ungrobided.py +++ b/python/extraction_ungrobided.py @@ -26,10 +26,10 @@ import happybase import mrjob from mrjob.job import MRJob import wayback.exception -from wayback.resource import Resource -from wayback.resource import ArcResource +from http.client import IncompleteRead from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory + from common import parse_ungrobided_line from grobid2json import teixml2json @@ -71,8 +71,10 @@ class MRExtractUnGrobided(MRJob): def __init__(self, *args, **kwargs): super(MRExtractUnGrobided, self).__init__(*args, **kwargs) - self.mime_filter = ['application/pdf'] self.hb_table = None + self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) + self.mime_filter = ['application/pdf'] + self.rstore = None def grobid_process_fulltext(self, content): r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", @@ -120,10 +122,13 @@ class MRExtractUnGrobided(MRJob): return info, None def fetch_warc_content(self, warc_path, offset, c_size): - warc_uri = self.options.warc_uri_prefix + warc_path + warc_uri = self.warc_uri_prefix + warc_path + if not self.rstore: + self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( + webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url)) try: - rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - gwb_record = rstore.load_resource(warc_uri, offset, c_size) + gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: return None, dict(status="error", reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") @@ -133,6 +138,9 @@ class MRExtractUnGrobided(MRJob): except EOFError as eofe: return None, dict(status="error", reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + except TypeError as te: + return None, dict(status="error", + reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. @@ -141,7 +149,13 @@ class MRExtractUnGrobided(MRJob): return None, dict(status="error", reason="archived HTTP response (WARC) was not 200", warc_status=gwb_record.get_status()[0]) - return gwb_record.open_raw_content().read(), None + + try: + raw_content = gwb_record.open_raw_content().read() + except IncompleteRead as ire: + return None, dict(status="error", + reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + return raw_content, None def extract(self, info): |