aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-06-16 17:27:25 -0700
committerBryan Newbold <bnewbold@archive.org>2020-06-16 17:27:25 -0700
commitbb46cd951a4cbe8664f02cc69c7983429e3cce3e (patch)
tree4f870edf064f1ec72c9d3ed46d72633b877bd564
parent9df71395046d045d7f8b568a55de4ea000de8791 (diff)
downloadsandcrawler-bb46cd951a4cbe8664f02cc69c7983429e3cce3e.tar.gz
sandcrawler-bb46cd951a4cbe8664f02cc69c7983429e3cce3e.zip
refactor worker fetch code into wrapper class
-rw-r--r--python/sandcrawler/grobid.py69
-rw-r--r--python/sandcrawler/pdftrio.py94
-rw-r--r--python/sandcrawler/workers.py89
3 files changed, 111 insertions, 141 deletions
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index f329a73..d9db6c3 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -2,7 +2,7 @@
import requests
from grobid2json import teixml2json
-from .workers import SandcrawlerWorker
+from .workers import SandcrawlerWorker, SandcrawlerFetchWorker
from .misc import gen_file_metadata
from .ia import WaybackClient, WaybackError, PetaboxError
@@ -78,12 +78,11 @@ class GrobidClient(object):
meta[k] = tei_json[k]
return meta
-class GrobidWorker(SandcrawlerWorker):
+class GrobidWorker(SandcrawlerFetchWorker):
def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs):
- super().__init__()
+ super().__init__(wayback_client=wayback_client)
self.grobid_client = grobid_client
- self.wayback_client = wayback_client
self.sink = sink
self.consolidate_mode = 2
@@ -98,62 +97,12 @@ class GrobidWorker(SandcrawlerWorker):
def process(self, record):
default_key = record['sha1hex']
- if record.get('warc_path') and record.get('warc_offset'):
- # it's a full CDX dict. fetch using WaybackClient
- if not self.wayback_client:
- raise Exception("wayback client not configured for this GrobidWorker")
- try:
- blob = self.wayback_client.fetch_petabox_body(
- csize=record['warc_csize'],
- offset=record['warc_offset'],
- warc_path=record['warc_path'],
- )
- except (WaybackError, PetaboxError) as we:
- return dict(
- status="error-wayback",
- error_msg=str(we),
- source=record,
- key=default_key,
- )
- elif record.get('url') and record.get('datetime'):
- # it's a partial CDX dict or something? fetch using WaybackClient
- if not self.wayback_client:
- raise Exception("wayback client not configured for this GrobidWorker")
- try:
- blob = self.wayback_client.fetch_replay_body(
- url=record['url'],
- datetime=record['datetime'],
- )
- except WaybackError as we:
- return dict(
- status="error-wayback",
- error_msg=str(we),
- source=record,
- key=default_key,
- )
- elif record.get('item') and record.get('path'):
- # it's petabox link; fetch via HTTP
- resp = requests.get("https://archive.org/serve/{}/{}".format(
- record['item'], record['path']))
- try:
- resp.raise_for_status()
- except Exception as e:
- return dict(
- status="error-petabox",
- error_msg=str(e),
- source=record,
- key=default_key,
- )
- blob = resp.content
- else:
- raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
- if not blob:
- return dict(
- status="error",
- error_msg="empty blob",
- source=record,
- key=default_key,
- )
+
+ fetch_result = self.fetch_blob(record)
+ if fetch_result['status'] != 'success':
+ return fetch_result
+ blob = fetch_result['blob']
+
result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
result['file_meta'] = gen_file_metadata(blob)
result['source'] = record
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index 41eed19..14d8d04 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -2,7 +2,7 @@
import time
import requests
-from .workers import SandcrawlerWorker
+from .workers import SandcrawlerWorker, SandcrawlerFetchWorker
from .misc import gen_file_metadata, requests_retry_session
from .ia import WaybackClient, WaybackError, PetaboxError
@@ -68,92 +68,28 @@ class PdfTrioClient(object):
return info
-class PdfTrioWorker(SandcrawlerWorker):
+class PdfTrioWorker(SandcrawlerFetchWorker):
"""
This class is basically copied directly from GrobidWorker
"""
def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs):
- super().__init__()
+ super().__init__(wayback_client=wayback_client)
self.pdftrio_client = pdftrio_client
- self.wayback_client = wayback_client
self.sink = sink
def process(self, record):
start_process = time.time()
default_key = record['sha1hex']
- wayback_sec = None
- petabox_sec = None
- if record.get('warc_path') and record.get('warc_offset'):
- # it's a full CDX dict. fetch using WaybackClient
- if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
- try:
- start = time.time()
- blob = self.wayback_client.fetch_petabox_body(
- csize=record['warc_csize'],
- offset=record['warc_offset'],
- warc_path=record['warc_path'],
- )
- wayback_sec = time.time() - start
- except (WaybackError, PetaboxError) as we:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error-wayback",
- error_msg=str(we),
- ),
- )
- elif record.get('url') and record.get('datetime'):
- # it's a partial CDX dict or something? fetch using WaybackClient
- if not self.wayback_client:
- raise Exception("wayback client not configured for this PdfTrioWorker")
- try:
- start = time.time()
- blob = self.wayback_client.fetch_replay_body(
- url=record['url'],
- datetime=record['datetime'],
- )
- wayback_sec = time.time() - start
- except WaybackError as we:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error-wayback",
- error_msg=str(we),
- ),
- )
- elif record.get('item') and record.get('path'):
- # it's petabox link; fetch via HTTP
- start = time.time()
- resp = requests.get("https://archive.org/serve/{}/{}".format(
- record['item'], record['path']))
- petabox_sec = time.time() - start
- try:
- resp.raise_for_status()
- except Exception as e:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error-petabox",
- error_msg=str(e),
- ),
- )
- blob = resp.content
- else:
- raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
- if not blob:
- return dict(
- key=default_key,
- source=record,
- pdf_trio=dict(
- status="error",
- error_msg="empty blob",
- ),
- )
+ fetch_sec = None
+
+ start = time.time()
+ fetch_result = self.fetch_blob(record)
+ fetch_sec = time.time() - start
+ if fetch_result['status'] != 'success':
+ return fetch_result
+ blob = fetch_result['blob']
+
result = dict()
result['file_meta'] = gen_file_metadata(blob)
result['key'] = result['file_meta']['sha1hex']
@@ -163,10 +99,8 @@ class PdfTrioWorker(SandcrawlerWorker):
pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
total_sec=time.time() - start_process,
)
- if wayback_sec:
- result['timing']['wayback_sec'] = wayback_sec
- if petabox_sec:
- result['timing']['petabox_sec'] = wayback_sec
+ if fetch_sec:
+ result['timing']['fetch_sec'] = fetch_sec
return result
class PdfTrioBlobWorker(SandcrawlerWorker):
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index a42b1a4..208b0e2 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -4,12 +4,13 @@ import json
import time
import signal
import zipfile
+import requests
import multiprocessing.pool
from collections import Counter
from confluent_kafka import Consumer, Producer, KafkaException
from .misc import parse_cdx_line
-from .ia import SandcrawlerBackoffError
+from .ia import SandcrawlerBackoffError, WaybackError, PetaboxError
class SandcrawlerWorker(object):
@@ -105,6 +106,92 @@ class SandcrawlerWorker(object):
"""
raise NotImplementedError('implementation required')
+
+class SandcrawlerFetchWorker(SandcrawlerWorker):
+ """
+ Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
+ PDFs) from wayback, archive.org, or other sources.
+ """
+
+ def __init__(self, wayback_client, **kwargs):
+ super().__init__(**kwargs)
+ self.wayback_client = wayback_client
+
+ def fetch_blob(self, record):
+ start_process = time.time()
+ default_key = record['sha1hex']
+ wayback_sec = None
+ petabox_sec = None
+
+ if record.get('warc_path') and record.get('warc_offset'):
+ # it's a full CDX dict. fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_petabox_body(
+ csize=record['warc_csize'],
+ offset=record['warc_offset'],
+ warc_path=record['warc_path'],
+ )
+ wayback_sec = time.time() - start
+ except (WaybackError, PetaboxError) as we:
+ return dict(
+ key=default_key,
+ source=record,
+ pdf_trio=dict(
+ status="error-wayback",
+ error_msg=str(we),
+ ),
+ )
+ elif record.get('url') and record.get('datetime'):
+ # it's a partial CDX dict or something? fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_replay_body(
+ url=record['url'],
+ datetime=record['datetime'],
+ )
+ wayback_sec = time.time() - start
+ except WaybackError as we:
+ return dict(
+ key=default_key,
+ source=record,
+ pdf_trio=dict(
+ status="error-wayback",
+ error_msg=str(we),
+ ),
+ )
+ elif record.get('item') and record.get('path'):
+ # it's petabox link; fetch via HTTP
+ start = time.time()
+ resp = requests.get("https://archive.org/serve/{}/{}".format(
+ record['item'], record['path']))
+ petabox_sec = time.time() - start
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ return dict(
+ key=default_key,
+ source=record,
+ pdf_trio=dict(
+ status="error-petabox",
+ error_msg=str(e),
+ ),
+ )
+ blob = resp.content
+ else:
+ raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ assert blob
+ return dict(
+ key=default_key,
+ status="success",
+ source=record,
+ blob=blob,
+ )
+
class MultiprocessWrapper(SandcrawlerWorker):
def __init__(self, worker, sink, jobs=None):