aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-06-17 11:12:59 -0700
committerBryan Newbold <bnewbold@archive.org>2020-06-17 11:12:59 -0700
commitd2fb570038ced65e6890e689e900a0f1aaed917c (patch)
tree706a7f6107ae30c21d783773fa5d418f18d2aac6
parent82c7ec45dfbaa83e3b29b968846016cc6ae8e87f (diff)
downloadsandcrawler-d2fb570038ced65e6890e689e900a0f1aaed917c.tar.gz
sandcrawler-d2fb570038ced65e6890e689e900a0f1aaed917c.zip
add new pdf workers/persisters
-rwxr-xr-xpython/persist_tool.py30
-rw-r--r--python/sandcrawler/__init__.py4
-rw-r--r--python/sandcrawler/persist.py99
-rwxr-xr-xpython/sandcrawler_worker.py83
4 files changed, 214 insertions, 2 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py
index 869af06..4d78314 100755
--- a/python/persist_tool.py
+++ b/python/persist_tool.py
@@ -75,6 +75,23 @@ def run_pdftrio(args):
)
pusher.run()
+def run_pdftext(args):
+ worker = PersistPdfTextWorker(
+ db_url=args.db_url,
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ s3_only=args.s3_only,
+ db_only=args.db_only,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=50,
+ )
+ pusher.run()
+
def run_ingest_file_result(args):
worker = PersistIngestFileResultWorker(
db_url=args.db_url,
@@ -140,6 +157,19 @@ def main():
action='store_true',
help="only write status to sandcrawler-db (don't save TEI-XML to S3)")
+ sub_pdftext = subparsers.add_parser('pdftext',
+ help="backfill a pdftext JSON ('pg') dump into postgresql and s3 (minio)")
+ sub_pdftext.set_defaults(func=run_pdftext)
+ sub_pdftext.add_argument('json_file',
+ help="pdftext file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+ sub_pdftext.add_argument('--s3-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_pdftext.add_argument('--db-only',
+ action='store_true',
+ help="only write status to sandcrawler-db (don't save TEI-XML to S3)")
+
sub_grobid_disk = subparsers.add_parser('grobid-disk',
help="dump GRBOID output to (local) files on disk")
sub_grobid_disk.set_defaults(func=run_grobid_disk)
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index 2e5efd7..a01d1f8 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -5,6 +5,6 @@ from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
from .workers import KafkaSink, KafkaCompressSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
from .ingest import IngestFileWorker
-from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker
+from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker, PersistPdfTextWorker, PersistThumbnailWorker
from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient
-from .pdf import PdfExtractWorker
+from .pdf import PdfExtractWorker, PdfExtractBlobWorker
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 338cdfc..196c4b9 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -372,3 +372,102 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
self.db.commit()
return []
+
+
+class PersistPdfTextWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+
+ Should keep batch sizes small.
+ """
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_only = kwargs.get('s3_only', False)
+ self.db_only = kwargs.get('db_only', False)
+ assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ for r in batch:
+ if r['status'] != 'success' or not r.get('text'):
+ self.counts['s3-skip-status'] += 1
+ if r.get('error_msg'):
+ r['metadata'] = {'error_msg': r['error_msg'][:500]}
+ continue
+
+ assert len(r['sha1hex']) == 40
+ if not self.db_only:
+ resp = self.s3.put_blob(
+ folder="text",
+ blob=r['text'],
+ sha1hex=r['sha1hex'],
+ extension=".txt",
+ )
+ self.counts['s3-put'] += 1
+
+ if not self.s3_only:
+ resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update")
+ self.counts['insert-pdf-meta'] += resp[0]
+ self.counts['update-pdf-meta'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
+
+ self.db.commit()
+
+ return []
+
+
+class PersistThumbnailWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+
+ This worker *must* be used with raw kakfa mode.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".jpg")
+
+ def process(self, blob, key=None):
+ """
+ Processing raw messages, not decoded JSON objects
+ """
+
+ assert key is not None and len(key) == 40
+ assert isinstance(blob, bytes)
+ assert len(blob) >= 50
+
+ resp = self.s3.put_blob(
+ folder="thumbnail",
+ blob=blob,
+ sha1hex=key,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+ return True
+
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index 5720f48..950eb4b 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -70,6 +70,75 @@ def run_persist_grobid(args):
)
pusher.run()
+def run_pdf_extract(args):
+ consume_topic = "sandcrawler-{}.unextracted-pg".format(args.env)
+ text_topic = "sandcrawler-{}.pdftext".format(args.env)
+ thumbnail_topic = "sandcrawler-{}.thumbnail-180px-jpeg".format(args.env)
+ text_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=text_topic,
+ )
+ thumbnail_sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=thumbnail_topic,
+ )
+ wayback_client = WaybackClient(
+ host_url=args.grobid_host,
+ )
+ worker = PdfExtractWorker(
+ wayback_client=wayback_client,
+ sink=text_sink,
+ thumbnail_sink=thumbnail_sink,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="pdf-extract",
+ batch_size=1,
+ )
+ pusher.run()
+
+def run_persist_pdftext(args):
+ consume_topic = "sandcrawler-{}.pdftext".format(args.env)
+ worker = PersistPdfTextWorker(
+ db_url=args.db_url,
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ s3_only=args.s3_only,
+ db_only=args.db_only,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-pdftext",
+ push_batches=True,
+ batch_size=25,
+ )
+ pusher.run()
+
+def run_persist_thumbnail(args):
+ consume_topic = "sandcrawler-{}.thumbnail".format(args.env)
+ worker = PersistThumbnailWorker(
+ s3_url=args.s3_url,
+ s3_bucket=args.s3_bucket,
+ s3_access_key=args.s3_access_key,
+ s3_secret_key=args.s3_secret_key,
+ # TODO: s3_extension=args.s3_extension,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-thumbnail",
+ raw_records=True,
+ batch_size=25,
+ )
+ pusher.run()
+
def run_persist_pdftrio(args):
consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env)
worker = PersistPdfTrioWorker(
@@ -179,6 +248,20 @@ def main():
help="only write status to database (don't upload TEI-XML to S3)")
sub_persist_grobid.set_defaults(func=run_persist_grobid)
+ sub_persist_pdftext = subparsers.add_parser('persist-pdftext',
+ help="daemon that consumes pdftext output from Kafka and pushes to minio and postgres")
+ sub_persist_pdftext.add_argument('--s3-only',
+ action='store_true',
+ help="only upload TEI-XML to S3 (don't write to database)")
+ sub_persist_pdftext.add_argument('--db-only',
+ action='store_true',
+ help="only write status to database (don't upload TEI-XML to S3)")
+ sub_persist_pdftext.set_defaults(func=run_persist_pdftext)
+
+ sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail',
+ help="daemon that consumes thumbnail output from Kafka and pushes to minio and postgres")
+ sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail)
+
sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio',
help="daemon that consumes pdftrio output from Kafka and pushes to postgres")
sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio)