aboutsummaryrefslogtreecommitdiffstats
path: root/python/extraction_ungrobided.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-02-21 11:55:16 -0800
committerBryan Newbold <bnewbold@archive.org>2019-02-21 11:55:20 -0800
commit4225fe89836b72e771e612139d0f5561088a6909 (patch)
tree3a6a5a24735e1b4212419bdca01a96392be5130e /python/extraction_ungrobided.py
parentf59e9895d3c9d198538b40e36d3b0cc3b4bb5b92 (diff)
downloadsandcrawler-4225fe89836b72e771e612139d0f5561088a6909.tar.gz
sandcrawler-4225fe89836b72e771e612139d0f5561088a6909.zip
backport GWB fetch improvements to extraction/kafka workers
*Really* need to refactor out these common methods into a base class.
Diffstat (limited to 'python/extraction_ungrobided.py')
-rwxr-xr-xpython/extraction_ungrobided.py28
1 files changed, 21 insertions, 7 deletions
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py
index 4b558dd..99d4f13 100755
--- a/python/extraction_ungrobided.py
+++ b/python/extraction_ungrobided.py
@@ -26,10 +26,10 @@ import happybase
import mrjob
from mrjob.job import MRJob
import wayback.exception
-from wayback.resource import Resource
-from wayback.resource import ArcResource
+from http.client import IncompleteRead
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
+
from common import parse_ungrobided_line
from grobid2json import teixml2json
@@ -71,8 +71,10 @@ class MRExtractUnGrobided(MRJob):
def __init__(self, *args, **kwargs):
super(MRExtractUnGrobided, self).__init__(*args, **kwargs)
- self.mime_filter = ['application/pdf']
self.hb_table = None
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.mime_filter = ['application/pdf']
+ self.rstore = None
def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
@@ -120,10 +122,13 @@ class MRExtractUnGrobided(MRJob):
return info, None
def fetch_warc_content(self, warc_path, offset, c_size):
- warc_uri = self.options.warc_uri_prefix + warc_path
+ warc_uri = self.warc_uri_prefix + warc_path
+ if not self.rstore:
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
+ webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.petabox_base_url))
try:
- rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- gwb_record = rstore.load_resource(warc_uri, offset, c_size)
+ gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable:
return None, dict(status="error",
reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
@@ -133,6 +138,9 @@ class MRExtractUnGrobided(MRJob):
except EOFError as eofe:
return None, dict(status="error",
reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ except TypeError as te:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
# Note: could consider a generic "except Exception" here, as we get so
# many petabox errors. Do want jobs to fail loud and clear when the
# whole cluster is down though.
@@ -141,7 +149,13 @@ class MRExtractUnGrobided(MRJob):
return None, dict(status="error",
reason="archived HTTP response (WARC) was not 200",
warc_status=gwb_record.get_status()[0])
- return gwb_record.open_raw_content().read(), None
+
+ try:
+ raw_content = gwb_record.open_raw_content().read()
+ except IncompleteRead as ire:
+ return None, dict(status="error",
+ reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return raw_content, None
def extract(self, info):