diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 89 |
1 files changed, 88 insertions, 1 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index a42b1a4..208b0e2 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -4,12 +4,13 @@ import json import time import signal import zipfile +import requests import multiprocessing.pool from collections import Counter from confluent_kafka import Consumer, Producer, KafkaException from .misc import parse_cdx_line -from .ia import SandcrawlerBackoffError +from .ia import SandcrawlerBackoffError, WaybackError, PetaboxError class SandcrawlerWorker(object): @@ -105,6 +106,92 @@ class SandcrawlerWorker(object): """ raise NotImplementedError('implementation required') + +class SandcrawlerFetchWorker(SandcrawlerWorker): + """ + Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, + PDFs) from wayback, archive.org, or other sources. + """ + + def __init__(self, wayback_client, **kwargs): + super().__init__(**kwargs) + self.wayback_client = wayback_client + + def fetch_blob(self, record): + start_process = time.time() + default_key = record['sha1hex'] + wayback_sec = None + petabox_sec = None + + if record.get('warc_path') and record.get('warc_offset'): + # it's a full CDX dict. fetch using WaybackClient + if not self.wayback_client: + raise Exception("wayback client not configured for this PdfTrioWorker") + try: + start = time.time() + blob = self.wayback_client.fetch_petabox_body( + csize=record['warc_csize'], + offset=record['warc_offset'], + warc_path=record['warc_path'], + ) + wayback_sec = time.time() - start + except (WaybackError, PetaboxError) as we: + return dict( + key=default_key, + source=record, + pdf_trio=dict( + status="error-wayback", + error_msg=str(we), + ), + ) + elif record.get('url') and record.get('datetime'): + # it's a partial CDX dict or something? fetch using WaybackClient + if not self.wayback_client: + raise Exception("wayback client not configured for this PdfTrioWorker") + try: + start = time.time() + blob = self.wayback_client.fetch_replay_body( + url=record['url'], + datetime=record['datetime'], + ) + wayback_sec = time.time() - start + except WaybackError as we: + return dict( + key=default_key, + source=record, + pdf_trio=dict( + status="error-wayback", + error_msg=str(we), + ), + ) + elif record.get('item') and record.get('path'): + # it's petabox link; fetch via HTTP + start = time.time() + resp = requests.get("https://archive.org/serve/{}/{}".format( + record['item'], record['path'])) + petabox_sec = time.time() - start + try: + resp.raise_for_status() + except Exception as e: + return dict( + key=default_key, + source=record, + pdf_trio=dict( + status="error-petabox", + error_msg=str(e), + ), + ) + blob = resp.content + else: + raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + assert blob + return dict( + key=default_key, + status="success", + source=record, + blob=blob, + ) + class MultiprocessWrapper(SandcrawlerWorker): def __init__(self, worker, sink, jobs=None): |