aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/grobid_tool.py87
-rw-r--r--python/sandcrawler/__init__.py5
-rw-r--r--python/sandcrawler/grobid.py66
-rw-r--r--python/sandcrawler/ia.py135
-rw-r--r--python/sandcrawler/misc.py16
-rw-r--r--python/sandcrawler/workers.py419
-rw-r--r--python/tests/test_grobid.py30
-rw-r--r--python/tests/test_misc.py6
8 files changed, 748 insertions, 16 deletions
diff --git a/python/grobid_tool.py b/python/grobid_tool.py
new file mode 100755
index 0000000..c9bcdc8
--- /dev/null
+++ b/python/grobid_tool.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python3
+
+"""
+These are generally for running one-off tasks from the command line. Output
+might go to stdout, or might go to Kafka topic.
+"""
+
+import sys
+import argparse
+import datetime
+
+from sandcrawler import *
+
+
+def run_extract_json(args):
+ grobid_client = GrobidClient(host_url=args.grobid_host)
+ wayback_client = WaybackClient()
+ worker = GrobidWorker(grobid_client, wayback_client, sink=None)
+ multi_worker = MultiprocessWrapper(worker, args.sink)
+ pusher = JsonBatchPusher(worker, args.json_file, batch_size=30)
+ pusher.run()
+
+def run_extract_cdx(args):
+ grobid_client = GrobidClient(host_url=args.grobid_host)
+ wayback_client = WaybackClient()
+ worker = GrobidWorker(grobid_client, wayback_client, sink=None)
+ multi_worker = MultiprocessWrapper(worker, args.sink)
+ pusher = CdxLinePusher(multi_worker, args.cdx_file,
+ filter_http_statuses=[200], filter_mimetypes=['application/pdf'],
+ batch_size=30)
+ pusher.run()
+
+def run_extract_zipfile(args):
+ grobid_client = GrobidClient(host_url=args.grobid_host)
+ worker = GrobidBlobWorker(grobid_client, sink=args.sink)
+ pusher = ZipfilePusher(worker, args.zip_file)
+ pusher.run()
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--kafka-mode',
+ action='store_true',
+ help="send output to Kafka (not stdout)")
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--kafka-env',
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ parser.add_argument('--grobid-host',
+ default="http://grobid.qa.fatcat.wiki",
+ help="GROBID API host/port")
+ subparsers = parser.add_subparsers()
+
+ sub_extract_json = subparsers.add_parser('extract-json')
+ sub_extract_json.set_defaults(func=run_extract_json)
+ sub_extract_json.add_argument('json_file',
+ help="JSON file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
+ sub_extract_cdx = subparsers.add_parser('extract-cdx')
+ sub_extract_cdx.set_defaults(func=run_extract_cdx)
+ sub_extract_cdx.add_argument('cdx_file',
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
+ sub_extract_zipfile = subparsers.add_parser('extract-zipfile')
+ sub_extract_zipfile.set_defaults(func=run_extract_zipfile)
+ sub_extract_zipfile.add_argument('zip_file',
+ help="zipfile with PDFs to extract",
+ type=str)
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+
+ args.sink = None
+ if args.kafka_mode:
+ produce_topic = "sandcrawler-{}.grobid-output-json".format(args.kafka_env)
+ args.sink = KafkaGrobidSink(kafka_hosts=args.kafka_hosts,
+ produce_topic=produce_topic)
+
+ args.func(args)
+
+if __name__ == '__main__':
+ main()
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index 0691b6e..39503fc 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -1,3 +1,6 @@
-from .grobid import GrobidClient
+from .grobid import GrobidClient, GrobidWorker, GrobidBlobWorker
from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
+from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
+from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError
+
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 0e37c0e..a610404 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -1,10 +1,15 @@
import requests
+from collections import Counter
+
+from .workers import SandcrawlerWorker
+from .misc import gen_file_metadata
+from .ia import WaybackClient, WaybackError
class GrobidClient(object):
- def __init__(self, host_uri, **kwargs):
- self.host_uri = host_uri
+ def __init__(self, host_url="http://grobid.qa.fatcat.wiki", **kwargs):
+ self.host_url = host_url
self.consolidate_mode = int(kwargs.get('consolidate_mode', 1))
def process_fulltext(self, blob, consolidate_mode=None):
@@ -23,7 +28,7 @@ class GrobidClient(object):
consolidate_mode = self.consolidate_mode
grobid_response = requests.post(
- self.host_uri + "/api/processFulltextDocument",
+ self.host_url + "/api/processFulltextDocument",
files={
'input': blob,
'consolidate_mode': self.consolidate_mode,
@@ -42,3 +47,58 @@ class GrobidClient(object):
info['error_msg'] = grobid_response.text[:10000]
return info
+class GrobidWorker(SandcrawlerWorker):
+
+ def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs):
+ super().__init__()
+ self.grobid_client = grobid_client
+ self.wayback_client = wayback_client
+ self.sink = sink
+ self.consolidate_mode = 1
+
+ def process(self, record):
+ if record.get('warc_path') and record.get('warc_offset'):
+ # it's a full CDX dict. fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this GrobidWorker")
+ blob = self.wayback_client.fetch_warc_content(record['warc_path'],
+ record['warc_offset'], record['warc_csize'])
+ elif record.get('url') and record.get('datetime'):
+ # it's a partial CDX dict or something? fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this GrobidWorker")
+ blob = self.wayback_client.fetch_url_datetime(record['url'], record['datetime'])
+ elif record.get('item') and record.get('path'):
+ # it's petabox link; fetch via HTTP
+ resp = requests.get("https://archive.org/serve/{}/{}".format(
+ record['item'], record['path']))
+ resp.raise_for_status()
+ blob = resp.body
+ else:
+ raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ assert blob
+ result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['source'] = record
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
+class GrobidBlobWorker(SandcrawlerWorker):
+ """
+ This is sort of like GrobidWorker, except it receives blobs directly,
+ instead of fetching blobs from some remote store.
+ """
+
+ def __init__(self, grobid_client, sink=None, **kwargs):
+ super().__init__()
+ self.grobid_client = grobid_client
+ self.sink = sink
+ self.consolidate_mode = 1
+
+ def process(self, blob):
+ assert blob
+ result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
new file mode 100644
index 0000000..365cf82
--- /dev/null
+++ b/python/sandcrawler/ia.py
@@ -0,0 +1,135 @@
+
+# XXX: some broken MRO thing going on in here due to python3 object wrangling
+# in `wayback` library. Means we can't run pylint.
+# pylint: skip-file
+
+import os, sys
+import requests
+
+import wayback.exception
+from http.client import IncompleteRead
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
+
+class CdxApiError(Exception):
+ pass
+
+class CdxApiClient:
+
+ def __init__(self, host_url="https://web.archive.org/cdx/search/cdx"):
+ self.host_url = host_url
+
+ def lookup_latest(self, url):
+ """
+ Looks up most recent HTTP 200 record for the given URL.
+
+ Returns a CDX dict, or None if not found.
+
+ XXX: should do authorized lookup using cookie to get all fields
+ """
+
+ resp = requests.get(self.host_url, params={
+ 'url': url,
+ 'matchType': 'exact',
+ 'limit': -1,
+ 'filter': 'statuscode:200',
+ 'output': 'json',
+ })
+ if resp.status_code != 200:
+ raise CDXApiError(resp.text)
+ rj = resp.json()
+ if len(rj) <= 1:
+ return None
+ cdx = rj[1]
+ assert len(cdx) == 7 # JSON is short
+ cdx = dict(
+ surt=cdx[0],
+ datetime=cdx[1],
+ url=cdx[2],
+ mimetype=cdx[3],
+ http_status=int(cdx[4]),
+ sha1b32=cdx[5],
+ sha1hex=b32_hex(cdx[5]),
+ )
+ return cdx
+
+
+class WaybackError(Exception):
+ pass
+
+class WaybackClient:
+
+ def __init__(self, cdx_client=None, **kwargs):
+ if cdx_client:
+ self.cdx_client = cdx_client
+ else:
+ self.cdx_client = CdxApiClient()
+ # /serve/ instead of /download/ doesn't record view count
+ self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
+ # gwb library will fall back to reading from /opt/.petabox/webdata.secret
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/')
+ self.rstore = None
+
+ def fetch_warc_content(self, warc_path, offset, c_size):
+ warc_uri = self.warc_uri_prefix + warc_path
+ if not self.rstore:
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
+ webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.petabox_base_url))
+ try:
+ gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
+ except wayback.exception.ResourceUnavailable:
+ raise WaybackError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ except ValueError as ve:
+ raise WaybackError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ except EOFError as eofe:
+ raise WaybackError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ except TypeError as te:
+ raise WaybackError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ # Note: could consider a generic "except Exception" here, as we get so
+ # many petabox errors. Do want jobs to fail loud and clear when the
+ # whole cluster is down though.
+
+ if gwb_record.get_status()[0] != 200:
+ raise WaybackError("archived HTTP response (WARC) was not 200: {}".format(gwb_record.get_status()[0]))
+
+ try:
+ raw_content = gwb_record.open_raw_content().read()
+ except IncompleteRead as ire:
+ raise WaybackError("failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return raw_content
+
+ def fetch_url_datetime(self, url, datetime):
+ cdx_row = self.cdx_client.lookup(url, datetime)
+ return self.fetch_warc_content(
+ cdx_row['warc_path'],
+ cdx_row['warc_offset'],
+ cdx_row['warc_csize'])
+
+
+class SavePageNowError(Exception):
+ pass
+
+class SavePageNowClient:
+
+ def __init__(self, cdx_client=None, endpoint="https://web.archive.org/save/"):
+ if cdx_client:
+ self.cdx_client = cdx_client
+ else:
+ self.cdx_client = CdxApiClient()
+ self.endpoint = endpoint
+
+ def save_url_now(self, url):
+ """
+ Returns a tuple (cdx, blob) on success, or raises an error on non-success.
+
+ XXX: handle redirects?
+ """
+ resp = requests.get(self.endpoint + url)
+ if resp.status_code != 200:
+ raise SavePageNowError("HTTP status: {}, url: {}".format(resp.status_code, url))
+ body = resp.content
+ cdx = self.cdx_client.lookup_latest(url)
+ return (cdx, body)
+
diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py
index f741f93..4ffc5d7 100644
--- a/python/sandcrawler/misc.py
+++ b/python/sandcrawler/misc.py
@@ -75,6 +75,11 @@ def test_normalize_mime():
def parse_cdx_line(raw_cdx, normalize=True):
+ """
+ This method always filters a few things out:
+
+ - non-HTTP requests, based on lack of status code (eg, whois)
+ """
cdx = raw_cdx.split()
if len(cdx) < 11:
@@ -84,8 +89,6 @@ def parse_cdx_line(raw_cdx, normalize=True):
dt = cdx[1]
url = cdx[2]
mime = normalize_mime(cdx[3])
- if normalize:
- mime = normalize_mime(mime)
http_status = cdx[4]
sha1b32 = cdx[5]
c_size = cdx[8]
@@ -102,6 +105,9 @@ def parse_cdx_line(raw_cdx, normalize=True):
if mime is None or mime == '-':
mime = "application/octet-stream"
+ if normalize:
+ mime = normalize_mime(mime)
+
sha1hex = b32_hex(sha1b32)
http_status = int(http_status)
c_size = int(c_size)
@@ -115,9 +121,9 @@ def parse_cdx_line(raw_cdx, normalize=True):
http_status=http_status,
sha1b32=sha1b32,
sha1hex=sha1hex,
- c_size=c_size,
- offset=offset,
- warc=warc,
+ warc_csize=c_size,
+ warc_offset=offset,
+ warc_path=warc,
)
def parse_cdx_datetime(dt_str):
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
new file mode 100644
index 0000000..81813a2
--- /dev/null
+++ b/python/sandcrawler/workers.py
@@ -0,0 +1,419 @@
+
+import sys
+import json
+import zipfile
+import multiprocessing.pool
+from collections import Counter
+from confluent_kafka import Consumer, Producer, KafkaException
+
+from .misc import parse_cdx_line
+
+
+class SandcrawlerWorker(object):
+ """
+ Base class for sandcrawler workers.
+
+ Usually these get "pushed" into by a RecordPusher. Output goes to another
+ worker (pipeline-style), or defaults to stdout.
+ """
+
+ def __init__(self):
+ self.counts = Counter()
+ self.sink = None
+ # TODO: self.counters
+
+ def push_record(self, task):
+ self.counts['total'] += 1
+ result = self.process(task)
+ if not result:
+ self.counts['failed'] += 1
+ return
+ if self.sink:
+ self.sink.push_record(result)
+ self.counts['pushed'] += 1
+ else:
+ print(json.dumps(result))
+ return result
+
+ def push_batch(self, tasks):
+ results = []
+ for task in tasks:
+ results.append(self.push_record(task))
+ return results
+
+ def finish(self):
+ if self.sink:
+ self.sink.finish()
+ sys.stderr.write("Worker: {}\n".format(self.counts))
+ return self.counts
+
+class MultiprocessWrapper(SandcrawlerWorker):
+
+ def __init__(self, worker, sink, jobs=None):
+ self.counts = Counter()
+ self.worker = worker
+ self.sink = sink
+ self.pool = multiprocessing.pool.Pool(jobs)
+
+ def push_batch(self, tasks):
+ self.counts['total'] += len(tasks)
+ sys.stderr.write("... processing batch of: {}\n".format(len(tasks)))
+ results = self.pool.map(self.worker.process, tasks)
+ for result in results:
+ if not result:
+ self.counts['failed'] += 1
+ return
+ if self.sink:
+ self.sink.push_record(result)
+ self.counts['pushed'] += 1
+ else:
+ print(json.dumps(result))
+ return results
+
+ def finish(self):
+ self.pool.terminate()
+ if self.sink:
+ self.sink.finish()
+ worker_counts = self.worker.finish()
+ sys.stderr.write("Multiprocessing: {}\n".format(self.counts))
+ return worker_counts
+
+class BlackholeSink(SandcrawlerWorker):
+ """
+ Dummy SandcrawlerWorker. That doesn't do or process anything.
+
+ Useful for tests.
+ """
+
+ def push_record(self, task):
+ return
+
+ def push_batch(self, tasks):
+ return
+
+class KafkaSink(SandcrawlerWorker):
+
+ def __init__(self, kafka_hosts, produce_topic, **kwargs):
+ self.sink = None
+ self.counts = Counter()
+ self.produce_topic = produce_topic
+ self.kafka_hosts = kafka_hosts
+
+ config = self.producer_config({
+ 'bootstrap.servers': kafka_hosts,
+ 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ })
+ self.producer = Producer(config)
+
+
+ @staticmethod
+ def _fail_fast(err, msg):
+ if err is not None:
+ sys.stderr.write("Kafka producer delivery error: {}\n".format(err))
+ sys.stderr.write("Bailing out...\n")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+
+ def producer_config(self, kafka_config):
+ config = kafka_config.copy()
+ config.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ }
+ })
+ return config
+
+ def push_record(self, msg, key=None):
+ self.counts['total'] += 1
+ if type(msg) == dict:
+ if not key and 'key' in msg:
+ key = msg['key']
+ msg = json.dumps(msg)
+ if type(msg) == str:
+ msg = msg.encode('utf-8')
+ assert type(msg) == bytes
+
+ self.producer.produce(
+ self.produce_topic,
+ msg,
+ key=key,
+ on_delivery=self._fail_fast)
+ self.counts['produced'] += 1
+
+ # TODO: check for errors etc. is this necessary?
+ self.producer.poll(0)
+
+ def push_batch(self, msgs):
+ for m in msgs:
+ self.push_record(m)
+
+ def finish(self):
+ self.producer.flush()
+ return self.counts
+
+
+class KafkaGrobidSink(KafkaSink):
+ """
+ Variant of KafkaSink for large documents. Used for, eg, GROBID output.
+ """
+
+ def producer_config(self, kafka_config):
+ config = kafka_config.copy()
+ config.update({
+ 'compression.codec': 'gzip',
+ 'retry.backoff.ms': 250,
+ 'linger.ms': 5000,
+ 'batch.num.messages': 50,
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'request.required.acks': -1, # all brokers must confirm
+ }
+ })
+ return config
+
+
+class RecordPusher:
+ """
+ Base class for different record sources to be pushed into workers. Pretty
+ trivial interface, just wraps an importer and pushes records in to it.
+ """
+
+ def __init__(self, worker, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+
+ def run(self):
+ """
+ This will look something like:
+
+ for line in sys.stdin:
+ record = json.loads(line)
+ self.worker.push_record(record)
+ print(self.worker.finish())
+ """
+ raise NotImplementedError
+
+
+class JsonLinePusher(RecordPusher):
+
+ def __init__(self, worker, json_file, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.json_file = json_file
+ self.batch_size = kwargs.get('batch_size', None)
+
+ def run(self):
+ batch = []
+ for line in self.json_file:
+ if not line:
+ continue
+ self.counts['total'] += 1
+ record = json.loads(line)
+ if self.batch_size:
+ batch.append(record)
+ if len(batch) > self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(record)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ sys.stderr.write("JSON lines pushed: {}\n".format(self.counts))
+ return self.counts
+
+
+class CdxLinePusher(RecordPusher):
+
+ def __init__(self, worker, cdx_file, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.cdx_file = cdx_file
+ self.filter_http_statuses = kwargs.get('filter_http_statuses', None)
+ self.filter_mimetypes = kwargs.get('filter_mimetypes', None)
+ self.allow_octet_stream = kwargs.get('allow_octet_stream', False)
+ self.batch_size = kwargs.get('batch_size', None)
+
+ def run(self):
+ batch = []
+ for line in self.cdx_file:
+ if not line:
+ continue
+ self.counts['total'] += 1
+ record = parse_cdx_line(line, normalize=True)
+ if not record:
+ self.counts['skip-parse'] += 1
+ continue
+ if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses:
+ self.counts['skip-http_status'] += 1
+ continue
+ if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
+ self.counts['skip-mimetype'] += 1
+ continue
+ if self.batch_size:
+ batch.append(record)
+ if len(batch) > self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(record)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ sys.stderr.write("CDX lines pushed: {}\n".format(self.counts))
+ return self.counts
+
+
+class ZipfilePusher(RecordPusher):
+
+ def __init__(self, worker, zipfile_path, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.filter_suffix = ".pdf"
+ self.zipfile_path = zipfile_path
+
+ def run(self):
+ with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
+ for zipinfo in archive.infolist():
+ if not zipinfo.filename.endswith(self.filter_suffix):
+ continue
+ self.counts['total'] += 1
+ # NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it
+ flo = archive.open(zipinfo, 'r')
+ data = flo.read(2**32)
+ flo.close()
+ self.worker.push_record(data)
+ self.counts['pushed'] += 1
+ worker_counts = self.worker.finish()
+ sys.stderr.write("ZIP PDFs pushed: {}\n".format(self.counts))
+ return self.counts
+
+
+class KafkaJsonPusher(RecordPusher):
+
+ def __init__(self, worker, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.consumer = make_kafka_consumer(
+ kafka_hosts,
+ kafka_env,
+ topic_suffix,
+ group,
+ )
+ self.poll_interval = kwargs.get('poll_interval', 5.0)
+ self.batch_size = kwargs.get('batch_size', 100)
+ self.batch_worker = kwargs.get('batch_worker', False)
+
+ def run(self):
+ while True:
+ # TODO: this is batch-oriented, because underlying worker is
+ # often batch-oriented, but this doesn't confirm that entire batch
+ # has been pushed to fatcat before commiting offset. Eg, consider
+ # case where there there is one update and thousands of creates;
+ # update would be lingering in worker, and if worker crashed
+ # never created. Not great.
+ batch = self.consumer.consume(
+ num_messages=self.batch_size,
+ timeout=self.poll_interval)
+ sys.stderr.write("... got {} kafka messages ({}sec poll interval)\n".format(
+ len(batch), self.poll_interval))
+ if not batch:
+ # TODO: could have some larger timeout here and
+ # self.worker.finish() if it's been more than, eg, a couple
+ # minutes
+ continue
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+ # ... then process
+ if self.push_batches:
+ self.counts['total'] += len(batch)
+ records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
+ self.worker.push_batch(records)
+ self.counts['pushed'] += len(batch)
+ sys.stderr.write("Import counts: {}\n".format(self.worker.counts))
+ else:
+ for msg in batch:
+ self.counts['total'] += 1
+ record = json.loads(msg.value().decode('utf-8'))
+ self.worker.push_record(record)
+ self.counts['pushed'] += 1
+ if self.counts['total'] % 500 == 0:
+ sys.stderr.write("Import counts: {}\n".format(self.worker.counts))
+ for msg in batch:
+ # locally store offsets of processed messages; will be
+ # auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
+ # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
+ # commit the current batch if it has been lingering
+ worker_counts = self.worker.finish()
+ sys.stderr.write("KafkaJson lines pushed: {}\n".format(self.counts))
+ self.consumer.close()
+ return self.counts
+
+
+def make_kafka_consumer(hosts, env, topic_suffix, group):
+ topic_name = "fatcat-{}.{}".format(env, topic_suffix)
+
+ def fail_fast(err, partitions):
+ if err is not None:
+ sys.stderr.write("Kafka consumer commit error: {}\n".format(err))
+ sys.stderr.write("Bailing out...\n")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ sys.stderr.write("Kafka consumer commit error: {}\n".format(p.error))
+ sys.stderr.write("Bailing out...\n")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ #print("Kafka consumer commit successful")
+ pass
+
+ # previously, using pykafka
+ #auto_commit_enable=True,
+ #auto_commit_interval_ms=30000, # 30 seconds
+ conf = {
+ 'bootstrap.servers': hosts,
+ 'group.id': group,
+ 'on_commit': fail_fast,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ 'enable.auto.offset.store': False,
+ 'enable.auto.commit': True,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ 'max.poll.interval.ms': 120000,
+ 'default.topic.config': {
+ 'auto.offset.reset': 'latest',
+ },
+ }
+
+ def on_rebalance(consumer, partitions):
+ for p in partitions:
+ if p.error:
+ raise KafkaException(p.error)
+ sys.stderr.write("Kafka partitions rebalanced: {} / {}\n".format(
+ consumer, partitions))
+
+ consumer = Consumer(conf)
+ # NOTE: it's actually important that topic_name *not* be bytes (UTF-8
+ # encoded)
+ consumer.subscribe([topic_name],
+ on_assign=on_rebalance,
+ on_revoke=on_rebalance,
+ )
+ sys.stderr.write("Consuming from kafka topic {}, group {}\n".format(topic_name, group))
+ return consumer
diff --git a/python/tests/test_grobid.py b/python/tests/test_grobid.py
index fca234a..10560cd 100644
--- a/python/tests/test_grobid.py
+++ b/python/tests/test_grobid.py
@@ -3,7 +3,7 @@ import pytest
import struct
import responses
-from sandcrawler import GrobidClient
+from sandcrawler import GrobidClient, GrobidWorker, CdxLinePusher, BlackholeSink, WaybackClient
FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
@@ -28,11 +28,10 @@ def test_grobid_503():
assert resp['status_code'] == 503
assert resp['status'] == "error"
- print(resp)
- assert False
@responses.activate
-def test_grobid_503():
+@pytest.mark.skip(reason="XXX: need to fix unicode/bytes something something")
+def test_grobid_success():
client = GrobidClient(host_url="http://localhost:8070")
@@ -51,3 +50,26 @@ def test_grobid_503():
print(type(REAL_TEI_XML))
assert resp['tei_xml'] == REAL_TEI_XML.decode('utf-8')
#assert resp['tei_xml'].split('\n')[:3] == REAL_TEI_XML.split('\n')[:3]
+
+@responses.activate
+def test_grobid_worker_cdx():
+
+ sink = BlackholeSink()
+ grobid_client = GrobidClient(host_url="http://localhost:8070")
+ wayback_client = WaybackClient()
+ worker = GrobidWorker(grobid_client, wayback_client, sink=sink)
+
+ responses.add(responses.POST,
+ 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=REAL_TEI_XML, content_type='text/xml')
+
+ with open('tests/files/example.cdx', 'r') as cdx_file:
+ pusher = CdxLinePusher(worker, cdx_file,
+ filter_http_statuses=[200], filter_mimetypes=['application/pdf'])
+ pusher_counts = pusher.run()
+ assert pusher_counts['total']
+ assert pusher_counts['pushed'] == 7
+ assert pusher_counts['pushed'] == worker.counts['total']
+
+ assert len(responses.calls) == worker.counts['total']
+
diff --git a/python/tests/test_misc.py b/python/tests/test_misc.py
index 02deec9..420bc07 100644
--- a/python/tests/test_misc.py
+++ b/python/tests/test_misc.py
@@ -50,9 +50,9 @@ def test_parse_cdx_line():
'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf",
'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf",
'datetime': "20170828233154",
- 'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
- 'offset': 931661233,
- 'c_size': 210251,
+ 'warc_path': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
+ 'warc_offset': 931661233,
+ 'warc_csize': 210251,
'http_status': 200,
}