diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/persist_tool.py | 30 | ||||
-rw-r--r-- | python/sandcrawler/__init__.py | 4 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 99 | ||||
-rwxr-xr-x | python/sandcrawler_worker.py | 83 |
4 files changed, 214 insertions, 2 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py index 869af06..4d78314 100755 --- a/python/persist_tool.py +++ b/python/persist_tool.py @@ -75,6 +75,23 @@ def run_pdftrio(args): ) pusher.run() +def run_pdftext(args): + worker = PersistPdfTextWorker( + db_url=args.db_url, + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + s3_only=args.s3_only, + db_only=args.db_only, + ) + pusher = JsonLinePusher( + worker, + args.json_file, + batch_size=50, + ) + pusher.run() + def run_ingest_file_result(args): worker = PersistIngestFileResultWorker( db_url=args.db_url, @@ -140,6 +157,19 @@ def main(): action='store_true', help="only write status to sandcrawler-db (don't save TEI-XML to S3)") + sub_pdftext = subparsers.add_parser('pdftext', + help="backfill a pdftext JSON ('pg') dump into postgresql and s3 (minio)") + sub_pdftext.set_defaults(func=run_pdftext) + sub_pdftext.add_argument('json_file', + help="pdftext file to import from (or '-' for stdin)", + type=argparse.FileType('r')) + sub_pdftext.add_argument('--s3-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_pdftext.add_argument('--db-only', + action='store_true', + help="only write status to sandcrawler-db (don't save TEI-XML to S3)") + sub_grobid_disk = subparsers.add_parser('grobid-disk', help="dump GRBOID output to (local) files on disk") sub_grobid_disk.set_defaults(func=run_grobid_disk) diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index 2e5efd7..a01d1f8 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -5,6 +5,6 @@ from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime from .workers import KafkaSink, KafkaCompressSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow from .ingest import IngestFileWorker -from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker +from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker, PersistPdfTextWorker, PersistThumbnailWorker from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient -from .pdf import PdfExtractWorker +from .pdf import PdfExtractWorker, PdfExtractBlobWorker diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 338cdfc..196c4b9 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -372,3 +372,102 @@ class PersistPdfTrioWorker(SandcrawlerWorker): self.db.commit() return [] + + +class PersistPdfTextWorker(SandcrawlerWorker): + """ + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + + Should keep batch sizes small. + """ + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_only = kwargs.get('s3_only', False) + self.db_only = kwargs.get('db_only', False) + assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + for r in batch: + if r['status'] != 'success' or not r.get('text'): + self.counts['s3-skip-status'] += 1 + if r.get('error_msg'): + r['metadata'] = {'error_msg': r['error_msg'][:500]} + continue + + assert len(r['sha1hex']) == 40 + if not self.db_only: + resp = self.s3.put_blob( + folder="text", + blob=r['text'], + sha1hex=r['sha1hex'], + extension=".txt", + ) + self.counts['s3-put'] += 1 + + if not self.s3_only: + resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update") + self.counts['insert-pdf-meta'] += resp[0] + self.counts['update-pdf-meta'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] + + self.db.commit() + + return [] + + +class PersistThumbnailWorker(SandcrawlerWorker): + """ + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + + This worker *must* be used with raw kakfa mode. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".jpg") + + def process(self, blob, key=None): + """ + Processing raw messages, not decoded JSON objects + """ + + assert key is not None and len(key) == 40 + assert isinstance(blob, bytes) + assert len(blob) >= 50 + + resp = self.s3.put_blob( + folder="thumbnail", + blob=blob, + sha1hex=key, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + return True + diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 5720f48..950eb4b 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -70,6 +70,75 @@ def run_persist_grobid(args): ) pusher.run() +def run_pdf_extract(args): + consume_topic = "sandcrawler-{}.unextracted-pg".format(args.env) + text_topic = "sandcrawler-{}.pdftext".format(args.env) + thumbnail_topic = "sandcrawler-{}.thumbnail-180px-jpeg".format(args.env) + text_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=text_topic, + ) + thumbnail_sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=thumbnail_topic, + ) + wayback_client = WaybackClient( + host_url=args.grobid_host, + ) + worker = PdfExtractWorker( + wayback_client=wayback_client, + sink=text_sink, + thumbnail_sink=thumbnail_sink, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="pdf-extract", + batch_size=1, + ) + pusher.run() + +def run_persist_pdftext(args): + consume_topic = "sandcrawler-{}.pdftext".format(args.env) + worker = PersistPdfTextWorker( + db_url=args.db_url, + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + s3_only=args.s3_only, + db_only=args.db_only, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-pdftext", + push_batches=True, + batch_size=25, + ) + pusher.run() + +def run_persist_thumbnail(args): + consume_topic = "sandcrawler-{}.thumbnail".format(args.env) + worker = PersistThumbnailWorker( + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + # TODO: s3_extension=args.s3_extension, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-thumbnail", + raw_records=True, + batch_size=25, + ) + pusher.run() + def run_persist_pdftrio(args): consume_topic = "sandcrawler-{}.pdftrio-output".format(args.env) worker = PersistPdfTrioWorker( @@ -179,6 +248,20 @@ def main(): help="only write status to database (don't upload TEI-XML to S3)") sub_persist_grobid.set_defaults(func=run_persist_grobid) + sub_persist_pdftext = subparsers.add_parser('persist-pdftext', + help="daemon that consumes pdftext output from Kafka and pushes to minio and postgres") + sub_persist_pdftext.add_argument('--s3-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_persist_pdftext.add_argument('--db-only', + action='store_true', + help="only write status to database (don't upload TEI-XML to S3)") + sub_persist_pdftext.set_defaults(func=run_persist_pdftext) + + sub_persist_thumbnail = subparsers.add_parser('persist-thumbnail', + help="daemon that consumes thumbnail output from Kafka and pushes to minio and postgres") + sub_persist_thumbnail.set_defaults(func=run_persist_thumbnail) + sub_persist_pdftrio = subparsers.add_parser('persist-pdftrio', help="daemon that consumes pdftrio output from Kafka and pushes to postgres") sub_persist_pdftrio.set_defaults(func=run_persist_pdftrio) |