aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/pdftrio_tool.py15
-rw-r--r--python/sandcrawler/pdftrio.py9
2 files changed, 14 insertions, 10 deletions
diff --git a/python/pdftrio_tool.py b/python/pdftrio_tool.py
index 843c214..ec92afe 100755
--- a/python/pdftrio_tool.py
+++ b/python/pdftrio_tool.py
@@ -6,7 +6,7 @@ text extraction.
Example of large parallel run, locally:
- cat /srv/sandcrawler/tasks/something.cdx | pv -l | parallel -j30 --pipe ./pdftrio_tool.py --kafka-env prod --kafka-hosts wbgrp-svc263.us.archive.org:9092,wbgrp-svc284.us.archive.org:9092,wbgrp-svc285.us.archive.org:9092 --kafka-mode --pdftrio-host http://localhost:3939 -j0 classify-pdf-json -
+cat /srv/sandcrawler/tasks/something.cdx | pv -l | parallel -j30 --pipe ./pdftrio_tool.py --kafka-env prod --kafka-hosts wbgrp-svc263.us.archive.org:9092,wbgrp-svc284.us.archive.org:9092,wbgrp-svc285.us.archive.org:9092 --kafka-mode --pdftrio-host http://localhost:3939 -j0 classify-pdf-json -
"""
import sys
@@ -21,11 +21,11 @@ def run_classify_pdf_json(args):
pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
wayback_client = WaybackClient()
if args.jobs > 1:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None)
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None, mode=args.pdftrio_mode)
multi_worker = MultiprocessWrapper(worker, args.sink)
pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs)
else:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink)
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink, mode=args.pdftrio_mode)
pusher = JsonLinePusher(worker, args.json_file)
pusher.run()
@@ -33,7 +33,7 @@ def run_classify_pdf_cdx(args):
pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
wayback_client = WaybackClient()
if args.jobs > 1:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None)
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=None, mode=args.pdftrio_mode)
multi_worker = MultiprocessWrapper(worker, args.sink)
pusher = CdxLinePusher(
multi_worker,
@@ -43,7 +43,7 @@ def run_classify_pdf_cdx(args):
batch_size=args.jobs,
)
else:
- worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink)
+ worker = PdfTrioWorker(pdftrio_client, wayback_client, sink=args.sink, mode=args.pdftrio_mode)
pusher = CdxLinePusher(
worker,
args.cdx_file,
@@ -54,7 +54,7 @@ def run_classify_pdf_cdx(args):
def run_classify_pdf_zipfile(args):
pdftrio_client = PdfTrioClient(host_url=args.pdftrio_host)
- worker = PdfTrioBlobWorker(pdftrio_client, sink=args.sink)
+ worker = PdfTrioBlobWorker(pdftrio_client, sink=args.sink, mode=args.pdftrio_mode)
pusher = ZipfilePusher(worker, args.zip_file)
pusher.run()
@@ -77,6 +77,9 @@ def main():
parser.add_argument('--pdftrio-host',
default="http://pdftrio.qa.fatcat.wiki",
help="pdftrio API host/port")
+ parser.add_argument('--pdftrio-mode',
+ default="auto",
+ help="which classification mode to use")
subparsers = parser.add_subparsers()
sub_classify_pdf_json = subparsers.add_parser('classify-pdf-json',
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index 52d1b8d..7a2e53c 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -13,7 +13,7 @@ class PdfTrioClient(object):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
- def classify_pdf(self, blob):
+ def classify_pdf(self, blob, mode="auto"):
"""
Returns a dict with at least:
@@ -30,7 +30,7 @@ class PdfTrioClient(object):
try:
pdftrio_response = requests.post(
- self.host_url + "/classify/research-pub/all",
+ self.host_url + "/classify/research-pub/" + mode,
files={
'pdf_content': blob,
},
@@ -167,10 +167,11 @@ class PdfTrioBlobWorker(SandcrawlerWorker):
instead of fetching blobs from some remote store.
"""
- def __init__(self, pdftrio_client, sink=None, **kwargs):
+ def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs):
super().__init__()
self.pdftrio_client = pdftrio_client
self.sink = sink
+ self.mode = mode
def process(self, blob):
start_process = time.time()
@@ -179,7 +180,7 @@ class PdfTrioBlobWorker(SandcrawlerWorker):
result = dict()
result['file_meta'] = gen_file_metadata(blob)
result['key'] = result['file_meta']['sha1hex']
- result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob)
+ result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob, mode=mode)
result['timing'] = dict(
pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
total_sec=time.time() - start_process,