aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py99
1 files changed, 99 insertions, 0 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 338cdfc..196c4b9 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -372,3 +372,102 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
self.db.commit()
return []
+
+
+class PersistPdfTextWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+
+ Should keep batch sizes small.
+ """
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_only = kwargs.get('s3_only', False)
+ self.db_only = kwargs.get('db_only', False)
+ assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ for r in batch:
+ if r['status'] != 'success' or not r.get('text'):
+ self.counts['s3-skip-status'] += 1
+ if r.get('error_msg'):
+ r['metadata'] = {'error_msg': r['error_msg'][:500]}
+ continue
+
+ assert len(r['sha1hex']) == 40
+ if not self.db_only:
+ resp = self.s3.put_blob(
+ folder="text",
+ blob=r['text'],
+ sha1hex=r['sha1hex'],
+ extension=".txt",
+ )
+ self.counts['s3-put'] += 1
+
+ if not self.s3_only:
+ resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update")
+ self.counts['insert-pdf-meta'] += resp[0]
+ self.counts['update-pdf-meta'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
+
+ self.db.commit()
+
+ return []
+
+
+class PersistThumbnailWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+
+ This worker *must* be used with raw kakfa mode.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".jpg")
+
+ def process(self, blob, key=None):
+ """
+ Processing raw messages, not decoded JSON objects
+ """
+
+ assert key is not None and len(key) == 40
+ assert isinstance(blob, bytes)
+ assert len(blob) >= 50
+
+ resp = self.s3.put_blob(
+ folder="thumbnail",
+ blob=blob,
+ sha1hex=key,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+ return True
+