diff options
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r-- | python/sandcrawler/persist.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 338cdfc..196c4b9 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -372,3 +372,102 @@ class PersistPdfTrioWorker(SandcrawlerWorker): self.db.commit() return [] + + +class PersistPdfTextWorker(SandcrawlerWorker): + """ + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + + Should keep batch sizes small. + """ + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_only = kwargs.get('s3_only', False) + self.db_only = kwargs.get('db_only', False) + assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + for r in batch: + if r['status'] != 'success' or not r.get('text'): + self.counts['s3-skip-status'] += 1 + if r.get('error_msg'): + r['metadata'] = {'error_msg': r['error_msg'][:500]} + continue + + assert len(r['sha1hex']) == 40 + if not self.db_only: + resp = self.s3.put_blob( + folder="text", + blob=r['text'], + sha1hex=r['sha1hex'], + extension=".txt", + ) + self.counts['s3-put'] += 1 + + if not self.s3_only: + resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update") + self.counts['insert-pdf-meta'] += resp[0] + self.counts['update-pdf-meta'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] + + self.db.commit() + + return [] + + +class PersistThumbnailWorker(SandcrawlerWorker): + """ + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + + This worker *must* be used with raw kakfa mode. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".jpg") + + def process(self, blob, key=None): + """ + Processing raw messages, not decoded JSON objects + """ + + assert key is not None and len(key) == 40 + assert isinstance(blob, bytes) + assert len(blob) >= 50 + + resp = self.s3.put_blob( + folder="thumbnail", + blob=blob, + sha1hex=key, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + return True + |