aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-06-16 17:27:25 -0700
committerBryan Newbold <bnewbold@archive.org>2020-06-16 17:27:25 -0700
commitbb46cd951a4cbe8664f02cc69c7983429e3cce3e (patch)
tree4f870edf064f1ec72c9d3ed46d72633b877bd564 /python/sandcrawler/workers.py
parent9df71395046d045d7f8b568a55de4ea000de8791 (diff)
downloadsandcrawler-bb46cd951a4cbe8664f02cc69c7983429e3cce3e.tar.gz
sandcrawler-bb46cd951a4cbe8664f02cc69c7983429e3cce3e.zip
refactor worker fetch code into wrapper class
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py89
1 files changed, 88 insertions, 1 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index a42b1a4..208b0e2 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -4,12 +4,13 @@ import json
import time
import signal
import zipfile
+import requests
import multiprocessing.pool
from collections import Counter
from confluent_kafka import Consumer, Producer, KafkaException
from .misc import parse_cdx_line
-from .ia import SandcrawlerBackoffError
+from .ia import SandcrawlerBackoffError, WaybackError, PetaboxError
class SandcrawlerWorker(object):
@@ -105,6 +106,92 @@ class SandcrawlerWorker(object):
"""
raise NotImplementedError('implementation required')
+
+class SandcrawlerFetchWorker(SandcrawlerWorker):
+ """
+ Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
+ PDFs) from wayback, archive.org, or other sources.
+ """
+
+ def __init__(self, wayback_client, **kwargs):
+ super().__init__(**kwargs)
+ self.wayback_client = wayback_client
+
+ def fetch_blob(self, record):
+ start_process = time.time()
+ default_key = record['sha1hex']
+ wayback_sec = None
+ petabox_sec = None
+
+ if record.get('warc_path') and record.get('warc_offset'):
+ # it's a full CDX dict. fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_petabox_body(
+ csize=record['warc_csize'],
+ offset=record['warc_offset'],
+ warc_path=record['warc_path'],
+ )
+ wayback_sec = time.time() - start
+ except (WaybackError, PetaboxError) as we:
+ return dict(
+ key=default_key,
+ source=record,
+ pdf_trio=dict(
+ status="error-wayback",
+ error_msg=str(we),
+ ),
+ )
+ elif record.get('url') and record.get('datetime'):
+ # it's a partial CDX dict or something? fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_replay_body(
+ url=record['url'],
+ datetime=record['datetime'],
+ )
+ wayback_sec = time.time() - start
+ except WaybackError as we:
+ return dict(
+ key=default_key,
+ source=record,
+ pdf_trio=dict(
+ status="error-wayback",
+ error_msg=str(we),
+ ),
+ )
+ elif record.get('item') and record.get('path'):
+ # it's petabox link; fetch via HTTP
+ start = time.time()
+ resp = requests.get("https://archive.org/serve/{}/{}".format(
+ record['item'], record['path']))
+ petabox_sec = time.time() - start
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ return dict(
+ key=default_key,
+ source=record,
+ pdf_trio=dict(
+ status="error-petabox",
+ error_msg=str(e),
+ ),
+ )
+ blob = resp.content
+ else:
+ raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ assert blob
+ return dict(
+ key=default_key,
+ status="success",
+ source=record,
+ blob=blob,
+ )
+
class MultiprocessWrapper(SandcrawlerWorker):
def __init__(self, worker, sink, jobs=None):