aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/pdftrio_tool.py118
-rwxr-xr-xpython/persist_tool.py18
-rw-r--r--python/sandcrawler/__init__.py3
-rw-r--r--python/sandcrawler/db.py57
-rw-r--r--python/sandcrawler/pdftrio.py158
-rw-r--r--python/sandcrawler/persist.py21
-rwxr-xr-xpython/sandcrawler_worker.py19
7 files changed, 393 insertions, 1 deletions
diff --git a/python/pdftrio_tool.py b/python/pdftrio_tool.py
new file mode 100755
index 0000000..843c214
--- /dev/null
+++ b/python/pdftrio_tool.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python3
+
+"""
+Basically just a copy of grobid_tool.py, but for PDF classification instead of
+text extraction.
+
+Example of large parallel run, locally:
+
+ cat /srv/sandcrawler/tasks/something.cdx | pv -l | parallel -j30 --pipe ./pdftrio_tool.py --kafka-env prod --kafka-hosts wbgrp-svc263.us.archive.org:9092,wbgrp-svc284.us.archive.org:9092,wbgrp-svc285.us.archive.org:9092 --kafka-mode --pdftrio-host http://localhost:3939 -j0 classify-pdf-json -
+"""
+
+import sys
+import json
+import argparse
+import datetime
+
+from sandcrawler import *
+
+
+def run_classify_pdf_json(args):
+ pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
+ wayback_client = WaybackClient()
+ if args.jobs > 1:
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None)
+ multi_worker = MultiprocessWrapper(worker, args.sink)
+ pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs)
+ else:
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink)
+ pusher = JsonLinePusher(worker, args.json_file)
+ pusher.run()
+
+def run_classify_pdf_cdx(args):
+ pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
+ wayback_client = WaybackClient()
+ if args.jobs > 1:
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None)
+ multi_worker = MultiprocessWrapper(worker, args.sink)
+ pusher = CdxLinePusher(
+ multi_worker,
+ args.cdx_file,
+ filter_http_statuses=[200, 226],
+ filter_mimetypes=['application/pdf'],
+ batch_size=args.jobs,
+ )
+ else:
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink)
+ pusher = CdxLinePusher(
+ worker,
+ args.cdx_file,
+ filter_http_statuses=[200, 226],
+ filter_mimetypes=['application/pdf'],
+ )
+ pusher.run()
+
+def run_classify_pdf_zipfile(args):
+ pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
+ worker = PdfTrioBlobWorker(pdftrio_client, sink=args.sink)
+ pusher = ZipfilePusher(worker, args.zip_file)
+ pusher.run()
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument('--kafka-mode',
+ action='store_true',
+ help="send output to Kafka (not stdout)")
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--kafka-env',
+ default="dev",
+ help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ parser.add_argument('-j', '--jobs',
+ default=8, type=int,
+ help="parallelism for batch CPU jobs")
+ parser.add_argument('--pdftrio-host',
+ default="http://pdftrio.qa.fatcat.wiki",
+ help="pdftrio API host/port")
+ subparsers = parser.add_subparsers()
+
+ sub_classify_pdf_json = subparsers.add_parser('classify-pdf-json',
+ help="for each JSON line with CDX info, fetches PDF and does pdftrio classify_pdfion")
+ sub_classify_pdf_json.set_defaults(func=run_classify_pdf_json)
+ sub_classify_pdf_json.add_argument('json_file',
+ help="JSON file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
+ sub_classify_pdf_cdx = subparsers.add_parser('classify-pdf-cdx',
+ help="for each CDX line, fetches PDF and does pdftrio classify_pdfion")
+ sub_classify_pdf_cdx.set_defaults(func=run_classify_pdf_cdx)
+ sub_classify_pdf_cdx.add_argument('cdx_file',
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
+ sub_classify_pdf_zipfile = subparsers.add_parser('classify-pdf-zipfile',
+ help="opens zipfile, iterates over PDF files inside and does pdftrio classify_pdf for each")
+ sub_classify_pdf_zipfile.set_defaults(func=run_classify_pdf_zipfile)
+ sub_classify_pdf_zipfile.add_argument('zip_file',
+ help="zipfile with PDFs to classify",
+ type=str)
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+
+ args.sink = None
+ if args.kafka_mode:
+ produce_topic = "sandcrawler-{}.pdftrio-output".format(args.kafka_env)
+ print("Running in kafka output mode, publishing to {}\n".format(produce_topic))
+ args.sink = KafkaSink(kafka_hosts=args.kafka_hosts,
+ produce_topic=produce_topic)
+
+ args.func(args)
+
+if __name__ == '__main__':
+ main()
diff --git a/python/persist_tool.py b/python/persist_tool.py
index 7187719..80b1156 100755
--- a/python/persist_tool.py
+++ b/python/persist_tool.py
@@ -63,6 +63,17 @@ def run_grobid_disk(args):
)
pusher.run()
+def run_pdftrio(args):
+ worker = PersistPdfTrioWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=100,
+ )
+ pusher.run()
+
def run_ingest_file_result(args):
worker = PersistIngestFileResultWorker(
db_url=args.db_url,
@@ -124,6 +135,13 @@ def main():
help="base directory to output into",
type=str)
+ sub_pdftrio = subparsers.add_parser('pdftrio',
+ help="backfill a pdftrio JSON ('pg') dump into postgresql and s3 (minio)")
+ sub_pdftrio.set_defaults(func=run_pdftrio)
+ sub_pdftrio.add_argument('json_file',
+ help="pdftrio file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
sub_ingest_file_result = subparsers.add_parser('ingest-file-result',
help="backfill a ingest_file_result JSON dump into postgresql")
sub_ingest_file_result.set_defaults(func=run_ingest_file_result)
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index 2d28829..b52d039 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -1,9 +1,10 @@
from .grobid import GrobidClient, GrobidWorker, GrobidBlobWorker
+from .pdftrio import PdfTrioClient, PdfTrioWorker, PdfTrioBlobWorker
from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
from .ingest import IngestFileWorker
-from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker
+from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker
from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 53d159f..ddb71a0 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -25,6 +25,15 @@ class SandcrawlerPostgrestClient:
else:
return None
+ def get_pdftrio(self, sha1):
+ resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
def get_file_meta(self, sha1):
resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
resp.raise_for_status()
@@ -176,6 +185,54 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
+ def insert_pdftrio(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ pdftrio (sha1hex, updated, status_code, status, pdftrio_version,
+ models_date, ensemble_score, bert_score, linear_score,
+ image_score)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=EXCLUDED.updated,
+ status_code=EXCLUDED.status_code,
+ status=EXCLUDED.status,
+ pdftrio_version=EXCLUDED.pdftrio_version,
+ models_date=EXCLUDED.models_date,
+ ensemble_score=EXCLUDED.ensemble_score,
+ bert_score=EXCLUDED.bert_score,
+ linear_score=EXCLUDED.linear_score,
+ image_score=EXCLUDED.image_score
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [
+ (
+ d['key'],
+ d.get('updated') or datetime.datetime.now(),
+ d['status_code'],
+ d['status'],
+ d.get('versions', {}).get('pdftrio_version') or None,
+ d.get('versions', {}).get('models_date') or None,
+ d.get('ensemble_score') or None,
+ d.get('bert_score') or None,
+ d.get('linear_score') or None,
+ d.get('image_score') or None,
+ )
+ for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
sql = """
INSERT INTO
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
new file mode 100644
index 0000000..a2eedd8
--- /dev/null
+++ b/python/sandcrawler/pdftrio.py
@@ -0,0 +1,158 @@
+
+import requests
+
+from .workers import SandcrawlerWorker
+from .misc import gen_file_metadata, requests_retry_session
+from .ia import WaybackClient, WaybackError, PetaboxError
+
+
+class PdfTrioClient(object):
+
+ def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs):
+ self.host_url = host_url
+ self.http_session = requests_retry_session(retries=3, backoff_factor=3)
+
+ def classify_pdf(self, blob):
+ """
+ Returns a dict with at least:
+
+ - status_code (int, always set)
+ - status (success, or error-*)
+
+ On success, the other remote API JSON response keys are also included.
+
+ On HTTP-level failures, the status_code and status field are set
+ appropriately; an optional `error_msg` may also be set. For some other
+ errors, like connection failure, an exception is raised.
+ """
+ assert blob
+
+ try:
+ pdftrio_response = requests.post(
+ self.host_url + "/classify/pdf/all",
+ files={
+ 'pdf': blob,
+ },
+ timeout=30.0,
+ )
+ except requests.Timeout:
+ return {
+ 'status': 'error-timeout',
+ 'status_code': -4, # heritrix3 "HTTP timeout" code
+ 'error_msg': 'pdftrio request (HTTP POST) timeout',
+ }
+
+ info = dict(
+ status_code=pdftrio_response.status_code,
+ )
+ if pdftrio_response.status_code == 200:
+ resp_json = pdftrio_response.json()
+ assert 'ensemble_score' in resp_json
+ assert 'status' in resp_json
+ assert 'versions' in resp_json
+ info.update(resp_json)
+ else:
+ info['status'] = 'error'
+ # TODO: might return JSON with some info?
+
+ # add this timing info at end so it isn't clobbered by an update()
+ if not info.get('timing'):
+ info['timing'] = dict()
+ info['timing']['total_sec'] = pdftrio_response.elapsed.total_seconds(),
+ return info
+
+
+class PdfTrioWorker(SandcrawlerWorker):
+ """
+ This class is basically copied directly from GrobidWorker
+ """
+
+ def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs):
+ super().__init__()
+ self.pdftrio_client = pdftrio_client
+ self.wayback_client = wayback_client
+ self.sink = sink
+
+ def process(self, record):
+ default_key = record['sha1hex']
+ if record.get('warc_path') and record.get('warc_offset'):
+ # it's a full CDX dict. fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ blob = self.wayback_client.fetch_petabox_body(
+ csize=record['warc_csize'],
+ offset=record['warc_offset'],
+ warc_path=record['warc_path'],
+ )
+ except (WaybackError, PetaboxError) as we:
+ return dict(
+ status="error-wayback",
+ error_msg=str(we),
+ source=record,
+ key=default_key,
+ )
+ elif record.get('url') and record.get('datetime'):
+ # it's a partial CDX dict or something? fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ blob = self.wayback_client.fetch_replay_body(
+ url=record['url'],
+ datetime=record['datetime'],
+ )
+ except WaybackError as we:
+ return dict(
+ status="error-wayback",
+ error_msg=str(we),
+ source=record,
+ key=default_key,
+ )
+ elif record.get('item') and record.get('path'):
+ # it's petabox link; fetch via HTTP
+ resp = requests.get("https://archive.org/serve/{}/{}".format(
+ record['item'], record['path']))
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ return dict(
+ status="error-petabox",
+ error_msg=str(e),
+ source=record,
+ key=default_key,
+ )
+ blob = resp.content
+ else:
+ raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ if not blob:
+ return dict(
+ status="error",
+ error_msg="empty blob",
+ source=record,
+ key=default_key,
+ )
+ result = self.pdftrio_client.classify_pdf(blob)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['source'] = record
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
+class PdfTrioBlobWorker(SandcrawlerWorker):
+ """
+ This is sort of like PdfTrioWorker, except it receives blobs directly,
+ instead of fetching blobs from some remote store.
+ """
+
+ def __init__(self, pdftrio_client, sink=None, **kwargs):
+ super().__init__()
+ self.pdftrio_client = pdftrio_client
+ self.sink = sink
+
+ def process(self, blob):
+ if not blob:
+ return None
+ result = self.pdftrio_client.classify_pdf(blob)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 6469940..64b2022 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -309,3 +309,24 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
self.counts['written'] += 1
return record
+
+class PersistPdfTrioWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+ resp = self.db.insert_pdftrio(self.cur, batch)
+ self.counts['insert-pdftrio'] += resp[0]
+ self.counts['update-pdftrio'] += resp[1]
+ self.db.commit()
+ return []
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index f13116a..02d075c 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -69,6 +69,21 @@ def run_persist_grobid(args):
)
pusher.run()
+def run_persist_pdftrio(args):
+ consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env)
+ worker = PersistPdfTrioWorker(
+ db_url=args.db_url,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-pdftrio",
+ push_batches=True,
+ batch_size=100,
+ )
+ pusher.run()
+
def run_ingest_file(args):
if args.bulk:
consume_group = "sandcrawler-{}-ingest-file-bulk".format(args.env)
@@ -158,6 +173,10 @@ def main():
help="only upload TEI-XML to S3 (don't write to database)")
sub_persist_grobid.set_defaults(func=run_persist_grobid)
+ sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio',
+ help="daemon that consumes pdftrio output from Kafka and pushes to postgres")
+ sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio)
+
sub_ingest_file = subparsers.add_parser('ingest-file',
help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka")
sub_ingest_file.add_argument('--bulk',