aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/__init__.py4
-rw-r--r--python/sandcrawler/persist.py99
2 files changed, 101 insertions, 2 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index 2e5efd7..a01d1f8 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -5,6 +5,6 @@ from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime
from .workers import KafkaSink, KafkaCompressSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
from .ingest import IngestFileWorker
-from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker
+from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker, PersistPdfTextWorker, PersistThumbnailWorker
from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient
-from .pdf import PdfExtractWorker
+from .pdf import PdfExtractWorker, PdfExtractBlobWorker
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 338cdfc..196c4b9 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -372,3 +372,102 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
self.db.commit()
return []
+
+
+class PersistPdfTextWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+
+ Should keep batch sizes small.
+ """
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_only = kwargs.get('s3_only', False)
+ self.db_only = kwargs.get('db_only', False)
+ assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ for r in batch:
+ if r['status'] != 'success' or not r.get('text'):
+ self.counts['s3-skip-status'] += 1
+ if r.get('error_msg'):
+ r['metadata'] = {'error_msg': r['error_msg'][:500]}
+ continue
+
+ assert len(r['sha1hex']) == 40
+ if not self.db_only:
+ resp = self.s3.put_blob(
+ folder="text",
+ blob=r['text'],
+ sha1hex=r['sha1hex'],
+ extension=".txt",
+ )
+ self.counts['s3-put'] += 1
+
+ if not self.s3_only:
+ resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update")
+ self.counts['insert-pdf-meta'] += resp[0]
+ self.counts['update-pdf-meta'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
+
+ self.db.commit()
+
+ return []
+
+
+class PersistThumbnailWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+
+ This worker *must* be used with raw kakfa mode.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".jpg")
+
+ def process(self, blob, key=None):
+ """
+ Processing raw messages, not decoded JSON objects
+ """
+
+ assert key is not None and len(key) == 40
+ assert isinstance(blob, bytes)
+ assert len(blob) >= 50
+
+ resp = self.s3.put_blob(
+ folder="thumbnail",
+ blob=blob,
+ sha1hex=key,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+ return True
+