aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-02-21 11:55:16 -0800
committerBryan Newbold <bnewbold@archive.org>2019-02-21 11:55:20 -0800
commit4225fe89836b72e771e612139d0f5561088a6909 (patch)
tree3a6a5a24735e1b4212419bdca01a96392be5130e
parentf59e9895d3c9d198538b40e36d3b0cc3b4bb5b92 (diff)
downloadsandcrawler-4225fe89836b72e771e612139d0f5561088a6909.tar.gz
sandcrawler-4225fe89836b72e771e612139d0f5561088a6909.zip
backport GWB fetch improvements to extraction/kafka workers
*Really* need to refactor out these common methods into a base class.
-rwxr-xr-xpython/extraction_cdx_grobid.py28
-rwxr-xr-xpython/extraction_ungrobided.py28
-rwxr-xr-xpython/kafka_grobid.py12
3 files changed, 50 insertions, 18 deletions
diff --git a/python/extraction_cdx_grobid.py b/python/extraction_cdx_grobid.py
index 76780b0..01d566e 100755
--- a/python/extraction_cdx_grobid.py
+++ b/python/extraction_cdx_grobid.py
@@ -26,10 +26,10 @@ import happybase
import mrjob
from mrjob.job import MRJob
import wayback.exception
-from wayback.resource import Resource
-from wayback.resource import ArcResource
+from http.client import IncompleteRead
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
+
from common import parse_cdx_line
from grobid2json import teixml2json
@@ -74,8 +74,10 @@ class MRExtractCdxGrobid(MRJob):
def __init__(self, *args, **kwargs):
super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
- self.mime_filter = ['application/pdf']
self.hb_table = None
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.mime_filter = ['application/pdf']
+ self.rstore = None
def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
@@ -117,10 +119,13 @@ class MRExtractCdxGrobid(MRJob):
return info, None
def fetch_warc_content(self, warc_path, offset, c_size):
- warc_uri = self.options.warc_uri_prefix + warc_path
+ warc_uri = self.warc_uri_prefix + warc_path
+ if not self.rstore:
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
+ webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.petabox_base_url))
try:
- rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- gwb_record = rstore.load_resource(warc_uri, offset, c_size)
+ gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable:
return None, dict(status="error",
reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
@@ -130,6 +135,9 @@ class MRExtractCdxGrobid(MRJob):
except EOFError as eofe:
return None, dict(status="error",
reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ except TypeError as te:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
# Note: could consider a generic "except Exception" here, as we get so
# many petabox errors. Do want jobs to fail loud and clear when the
# whole cluster is down though.
@@ -138,7 +146,13 @@ class MRExtractCdxGrobid(MRJob):
return None, dict(status="error",
reason="archived HTTP response (WARC) was not 200",
warc_status=gwb_record.get_status()[0])
- return gwb_record.open_raw_content().read(), None
+
+ try:
+ raw_content = gwb_record.open_raw_content().read()
+ except IncompleteRead as ire:
+ return None, dict(status="error",
+ reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return raw_content, None
def extract(self, info):
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py
index 4b558dd..99d4f13 100755
--- a/python/extraction_ungrobided.py
+++ b/python/extraction_ungrobided.py
@@ -26,10 +26,10 @@ import happybase
import mrjob
from mrjob.job import MRJob
import wayback.exception
-from wayback.resource import Resource
-from wayback.resource import ArcResource
+from http.client import IncompleteRead
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
+
from common import parse_ungrobided_line
from grobid2json import teixml2json
@@ -71,8 +71,10 @@ class MRExtractUnGrobided(MRJob):
def __init__(self, *args, **kwargs):
super(MRExtractUnGrobided, self).__init__(*args, **kwargs)
- self.mime_filter = ['application/pdf']
self.hb_table = None
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.mime_filter = ['application/pdf']
+ self.rstore = None
def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
@@ -120,10 +122,13 @@ class MRExtractUnGrobided(MRJob):
return info, None
def fetch_warc_content(self, warc_path, offset, c_size):
- warc_uri = self.options.warc_uri_prefix + warc_path
+ warc_uri = self.warc_uri_prefix + warc_path
+ if not self.rstore:
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
+ webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.petabox_base_url))
try:
- rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- gwb_record = rstore.load_resource(warc_uri, offset, c_size)
+ gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable:
return None, dict(status="error",
reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
@@ -133,6 +138,9 @@ class MRExtractUnGrobided(MRJob):
except EOFError as eofe:
return None, dict(status="error",
reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ except TypeError as te:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
# Note: could consider a generic "except Exception" here, as we get so
# many petabox errors. Do want jobs to fail loud and clear when the
# whole cluster is down though.
@@ -141,7 +149,13 @@ class MRExtractUnGrobided(MRJob):
return None, dict(status="error",
reason="archived HTTP response (WARC) was not 200",
warc_status=gwb_record.get_status()[0])
- return gwb_record.open_raw_content().read(), None
+
+ try:
+ raw_content = gwb_record.open_raw_content().read()
+ except IncompleteRead as ire:
+ return None, dict(status="error",
+ reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return raw_content, None
def extract(self, info):
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
index 17908e5..ba84eee 100755
--- a/python/kafka_grobid.py
+++ b/python/kafka_grobid.py
@@ -37,13 +37,11 @@ import xml
import json
import raven
import struct
-import requests
import argparse
+import requests
import pykafka
import wayback.exception
from http.client import IncompleteRead
-from wayback.resource import Resource
-from wayback.resource import ArcResource
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
@@ -66,6 +64,10 @@ class KafkaGrobidWorker:
self.consumer_group = kwargs.get('consumer_group', 'grobid-extraction')
self.kafka_hosts = kafka_hosts or 'localhost:9092'
self.grobid_uri = kwargs.get('grobid_uri')
+ # /serve/ instead of /download/ doesn't record view count
+ self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
+ # gwb library will fall back to reading from /opt/.petabox/webdata.secret
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
self.warc_uri_prefix = kwargs.get('warc_uri_prefix')
self.mime_filter = ['application/pdf']
self.rstore = None
@@ -104,7 +106,9 @@ class KafkaGrobidWorker:
def fetch_warc_content(self, warc_path, offset, c_size):
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
+ webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.petabox_base_url))
try:
gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
except wayback.exception.ResourceUnavailable: