From 4225fe89836b72e771e612139d0f5561088a6909 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 21 Feb 2019 11:55:16 -0800 Subject: backport GWB fetch improvements to extraction/kafka workers *Really* need to refactor out these common methods into a base class. --- python/extraction_cdx_grobid.py | 28 +++++++++++++++++++++------- python/extraction_ungrobided.py | 28 +++++++++++++++++++++------- python/kafka_grobid.py | 12 ++++++++---- 3 files changed, 50 insertions(+), 18 deletions(-) (limited to 'python') diff --git a/python/extraction_cdx_grobid.py b/python/extraction_cdx_grobid.py index 76780b0..01d566e 100755 --- a/python/extraction_cdx_grobid.py +++ b/python/extraction_cdx_grobid.py @@ -26,10 +26,10 @@ import happybase import mrjob from mrjob.job import MRJob import wayback.exception -from wayback.resource import Resource -from wayback.resource import ArcResource +from http.client import IncompleteRead from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory + from common import parse_cdx_line from grobid2json import teixml2json @@ -74,8 +74,10 @@ class MRExtractCdxGrobid(MRJob): def __init__(self, *args, **kwargs): super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) - self.mime_filter = ['application/pdf'] self.hb_table = None + self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) + self.mime_filter = ['application/pdf'] + self.rstore = None def grobid_process_fulltext(self, content): r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", @@ -117,10 +119,13 @@ class MRExtractCdxGrobid(MRJob): return info, None def fetch_warc_content(self, warc_path, offset, c_size): - warc_uri = self.options.warc_uri_prefix + warc_path + warc_uri = self.warc_uri_prefix + warc_path + if not self.rstore: + self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( + webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url)) try: - rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - gwb_record = rstore.load_resource(warc_uri, offset, c_size) + gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: return None, dict(status="error", reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") @@ -130,6 +135,9 @@ class MRExtractCdxGrobid(MRJob): except EOFError as eofe: return None, dict(status="error", reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + except TypeError as te: + return None, dict(status="error", + reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. @@ -138,7 +146,13 @@ class MRExtractCdxGrobid(MRJob): return None, dict(status="error", reason="archived HTTP response (WARC) was not 200", warc_status=gwb_record.get_status()[0]) - return gwb_record.open_raw_content().read(), None + + try: + raw_content = gwb_record.open_raw_content().read() + except IncompleteRead as ire: + return None, dict(status="error", + reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + return raw_content, None def extract(self, info): diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py index 4b558dd..99d4f13 100755 --- a/python/extraction_ungrobided.py +++ b/python/extraction_ungrobided.py @@ -26,10 +26,10 @@ import happybase import mrjob from mrjob.job import MRJob import wayback.exception -from wayback.resource import Resource -from wayback.resource import ArcResource +from http.client import IncompleteRead from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory + from common import parse_ungrobided_line from grobid2json import teixml2json @@ -71,8 +71,10 @@ class MRExtractUnGrobided(MRJob): def __init__(self, *args, **kwargs): super(MRExtractUnGrobided, self).__init__(*args, **kwargs) - self.mime_filter = ['application/pdf'] self.hb_table = None + self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) + self.mime_filter = ['application/pdf'] + self.rstore = None def grobid_process_fulltext(self, content): r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", @@ -120,10 +122,13 @@ class MRExtractUnGrobided(MRJob): return info, None def fetch_warc_content(self, warc_path, offset, c_size): - warc_uri = self.options.warc_uri_prefix + warc_path + warc_uri = self.warc_uri_prefix + warc_path + if not self.rstore: + self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( + webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url)) try: - rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - gwb_record = rstore.load_resource(warc_uri, offset, c_size) + gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: return None, dict(status="error", reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") @@ -133,6 +138,9 @@ class MRExtractUnGrobided(MRJob): except EOFError as eofe: return None, dict(status="error", reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + except TypeError as te: + return None, dict(status="error", + reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) # Note: could consider a generic "except Exception" here, as we get so # many petabox errors. Do want jobs to fail loud and clear when the # whole cluster is down though. @@ -141,7 +149,13 @@ class MRExtractUnGrobided(MRJob): return None, dict(status="error", reason="archived HTTP response (WARC) was not 200", warc_status=gwb_record.get_status()[0]) - return gwb_record.open_raw_content().read(), None + + try: + raw_content = gwb_record.open_raw_content().read() + except IncompleteRead as ire: + return None, dict(status="error", + reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + return raw_content, None def extract(self, info): diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py index 17908e5..ba84eee 100755 --- a/python/kafka_grobid.py +++ b/python/kafka_grobid.py @@ -37,13 +37,11 @@ import xml import json import raven import struct -import requests import argparse +import requests import pykafka import wayback.exception from http.client import IncompleteRead -from wayback.resource import Resource -from wayback.resource import ArcResource from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory @@ -66,6 +64,10 @@ class KafkaGrobidWorker: self.consumer_group = kwargs.get('consumer_group', 'grobid-extraction') self.kafka_hosts = kafka_hosts or 'localhost:9092' self.grobid_uri = kwargs.get('grobid_uri') + # /serve/ instead of /download/ doesn't record view count + self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') + # gwb library will fall back to reading from /opt/.petabox/webdata.secret + self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) self.warc_uri_prefix = kwargs.get('warc_uri_prefix') self.mime_filter = ['application/pdf'] self.rstore = None @@ -104,7 +106,9 @@ class KafkaGrobidWorker: def fetch_warc_content(self, warc_path, offset, c_size): warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) + self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( + webdata_secret=self.petabox_webdata_secret, + download_base_url=self.petabox_base_url)) try: gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) except wayback.exception.ResourceUnavailable: -- cgit v1.2.3