From bb46cd951a4cbe8664f02cc69c7983429e3cce3e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 16 Jun 2020 17:27:25 -0700 Subject: refactor worker fetch code into wrapper class --- python/sandcrawler/grobid.py | 69 +++++-------------------------- python/sandcrawler/pdftrio.py | 94 +++++++------------------------------------ python/sandcrawler/workers.py | 89 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 111 insertions(+), 141 deletions(-) (limited to 'python') diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index f329a73..d9db6c3 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -2,7 +2,7 @@ import requests from grobid2json import teixml2json -from .workers import SandcrawlerWorker +from .workers import SandcrawlerWorker, SandcrawlerFetchWorker from .misc import gen_file_metadata from .ia import WaybackClient, WaybackError, PetaboxError @@ -78,12 +78,11 @@ class GrobidClient(object): meta[k] = tei_json[k] return meta -class GrobidWorker(SandcrawlerWorker): +class GrobidWorker(SandcrawlerFetchWorker): def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs): - super().__init__() + super().__init__(wayback_client=wayback_client) self.grobid_client = grobid_client - self.wayback_client = wayback_client self.sink = sink self.consolidate_mode = 2 @@ -98,62 +97,12 @@ class GrobidWorker(SandcrawlerWorker): def process(self, record): default_key = record['sha1hex'] - if record.get('warc_path') and record.get('warc_offset'): - # it's a full CDX dict. fetch using WaybackClient - if not self.wayback_client: - raise Exception("wayback client not configured for this GrobidWorker") - try: - blob = self.wayback_client.fetch_petabox_body( - csize=record['warc_csize'], - offset=record['warc_offset'], - warc_path=record['warc_path'], - ) - except (WaybackError, PetaboxError) as we: - return dict( - status="error-wayback", - error_msg=str(we), - source=record, - key=default_key, - ) - elif record.get('url') and record.get('datetime'): - # it's a partial CDX dict or something? fetch using WaybackClient - if not self.wayback_client: - raise Exception("wayback client not configured for this GrobidWorker") - try: - blob = self.wayback_client.fetch_replay_body( - url=record['url'], - datetime=record['datetime'], - ) - except WaybackError as we: - return dict( - status="error-wayback", - error_msg=str(we), - source=record, - key=default_key, - ) - elif record.get('item') and record.get('path'): - # it's petabox link; fetch via HTTP - resp = requests.get("https://archive.org/serve/{}/{}".format( - record['item'], record['path'])) - try: - resp.raise_for_status() - except Exception as e: - return dict( - status="error-petabox", - error_msg=str(e), - source=record, - key=default_key, - ) - blob = resp.content - else: - raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") - if not blob: - return dict( - status="error", - error_msg="empty blob", - source=record, - key=default_key, - ) + + fetch_result = self.fetch_blob(record) + if fetch_result['status'] != 'success': + return fetch_result + blob = fetch_result['blob'] + result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['source'] = record diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py index 41eed19..14d8d04 100644 --- a/python/sandcrawler/pdftrio.py +++ b/python/sandcrawler/pdftrio.py @@ -2,7 +2,7 @@ import time import requests -from .workers import SandcrawlerWorker +from .workers import SandcrawlerWorker, SandcrawlerFetchWorker from .misc import gen_file_metadata, requests_retry_session from .ia import WaybackClient, WaybackError, PetaboxError @@ -68,92 +68,28 @@ class PdfTrioClient(object): return info -class PdfTrioWorker(SandcrawlerWorker): +class PdfTrioWorker(SandcrawlerFetchWorker): """ This class is basically copied directly from GrobidWorker """ def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs): - super().__init__() + super().__init__(wayback_client=wayback_client) self.pdftrio_client = pdftrio_client - self.wayback_client = wayback_client self.sink = sink def process(self, record): start_process = time.time() default_key = record['sha1hex'] - wayback_sec = None - petabox_sec = None - if record.get('warc_path') and record.get('warc_offset'): - # it's a full CDX dict. fetch using WaybackClient - if not self.wayback_client: - raise Exception("wayback client not configured for this PdfTrioWorker") - try: - start = time.time() - blob = self.wayback_client.fetch_petabox_body( - csize=record['warc_csize'], - offset=record['warc_offset'], - warc_path=record['warc_path'], - ) - wayback_sec = time.time() - start - except (WaybackError, PetaboxError) as we: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error-wayback", - error_msg=str(we), - ), - ) - elif record.get('url') and record.get('datetime'): - # it's a partial CDX dict or something? fetch using WaybackClient - if not self.wayback_client: - raise Exception("wayback client not configured for this PdfTrioWorker") - try: - start = time.time() - blob = self.wayback_client.fetch_replay_body( - url=record['url'], - datetime=record['datetime'], - ) - wayback_sec = time.time() - start - except WaybackError as we: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error-wayback", - error_msg=str(we), - ), - ) - elif record.get('item') and record.get('path'): - # it's petabox link; fetch via HTTP - start = time.time() - resp = requests.get("https://archive.org/serve/{}/{}".format( - record['item'], record['path'])) - petabox_sec = time.time() - start - try: - resp.raise_for_status() - except Exception as e: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error-petabox", - error_msg=str(e), - ), - ) - blob = resp.content - else: - raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") - if not blob: - return dict( - key=default_key, - source=record, - pdf_trio=dict( - status="error", - error_msg="empty blob", - ), - ) + fetch_sec = None + + start = time.time() + fetch_result = self.fetch_blob(record) + fetch_sec = time.time() - start + if fetch_result['status'] != 'success': + return fetch_result + blob = fetch_result['blob'] + result = dict() result['file_meta'] = gen_file_metadata(blob) result['key'] = result['file_meta']['sha1hex'] @@ -163,10 +99,8 @@ class PdfTrioWorker(SandcrawlerWorker): pdftrio_sec=result['pdf_trio'].pop('_total_sec', None), total_sec=time.time() - start_process, ) - if wayback_sec: - result['timing']['wayback_sec'] = wayback_sec - if petabox_sec: - result['timing']['petabox_sec'] = wayback_sec + if fetch_sec: + result['timing']['fetch_sec'] = fetch_sec return result class PdfTrioBlobWorker(SandcrawlerWorker): diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index a42b1a4..208b0e2 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -4,12 +4,13 @@ import json import time import signal import zipfile +import requests import multiprocessing.pool from collections import Counter from confluent_kafka import Consumer, Producer, KafkaException from .misc import parse_cdx_line -from .ia import SandcrawlerBackoffError +from .ia import SandcrawlerBackoffError, WaybackError, PetaboxError class SandcrawlerWorker(object): @@ -105,6 +106,92 @@ class SandcrawlerWorker(object): """ raise NotImplementedError('implementation required') + +class SandcrawlerFetchWorker(SandcrawlerWorker): + """ + Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, + PDFs) from wayback, archive.org, or other sources. + """ + + def __init__(self, wayback_client, **kwargs): + super().__init__(**kwargs) + self.wayback_client = wayback_client + + def fetch_blob(self, record): + start_process = time.time() + default_key = record['sha1hex'] + wayback_sec = None + petabox_sec = None + + if record.get('warc_path') and record.get('warc_offset'): + # it's a full CDX dict. fetch using WaybackClient + if not self.wayback_client: + raise Exception("wayback client not configured for this PdfTrioWorker") + try: + start = time.time() + blob = self.wayback_client.fetch_petabox_body( + csize=record['warc_csize'], + offset=record['warc_offset'], + warc_path=record['warc_path'], + ) + wayback_sec = time.time() - start + except (WaybackError, PetaboxError) as we: + return dict( + key=default_key, + source=record, + pdf_trio=dict( + status="error-wayback", + error_msg=str(we), + ), + ) + elif record.get('url') and record.get('datetime'): + # it's a partial CDX dict or something? fetch using WaybackClient + if not self.wayback_client: + raise Exception("wayback client not configured for this PdfTrioWorker") + try: + start = time.time() + blob = self.wayback_client.fetch_replay_body( + url=record['url'], + datetime=record['datetime'], + ) + wayback_sec = time.time() - start + except WaybackError as we: + return dict( + key=default_key, + source=record, + pdf_trio=dict( + status="error-wayback", + error_msg=str(we), + ), + ) + elif record.get('item') and record.get('path'): + # it's petabox link; fetch via HTTP + start = time.time() + resp = requests.get("https://archive.org/serve/{}/{}".format( + record['item'], record['path'])) + petabox_sec = time.time() - start + try: + resp.raise_for_status() + except Exception as e: + return dict( + key=default_key, + source=record, + pdf_trio=dict( + status="error-petabox", + error_msg=str(e), + ), + ) + blob = resp.content + else: + raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + assert blob + return dict( + key=default_key, + status="success", + source=record, + blob=blob, + ) + class MultiprocessWrapper(SandcrawlerWorker): def __init__(self, worker, sink, jobs=None): -- cgit v1.2.3