From b4a40d99b23a83eabeed490c0dce52dba31dc7b8 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 17 Jun 2020 21:23:08 -0700 Subject: fixes and tweaks from testing locally --- python/sandcrawler/persist.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) (limited to 'python/sandcrawler/persist.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 8d421ad..6d9298e 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -20,12 +20,14 @@ grobid """ import os +from typing import Optional import xml.etree.ElementTree from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgresClient from sandcrawler.minio import SandcrawlerMinioClient from sandcrawler.grobid import GrobidClient +from sandcrawler.pdfextract import PdfExtractResult class PersistCdxWorker(SandcrawlerWorker): @@ -404,29 +406,33 @@ class PersistPdfTextWorker(SandcrawlerWorker): def push_batch(self, batch): self.counts['total'] += len(batch) + parsed_batch = [] for r in batch: - if r['status'] != 'success' or not r.get('text'): + parsed_batch.append(PdfExtractResult.from_pdftext_dict(r)) + + for r in parsed_batch: + if r.status != 'success' or not r.text: self.counts['s3-skip-status'] += 1 - if r.get('error_msg'): - r['metadata'] = {'error_msg': r['error_msg'][:500]} + if r.error_msg: + r.metadata = {'error_msg': r.error_msg[:500]} continue - assert len(r['sha1hex']) == 40 + assert len(r.sha1hex) == 40 if not self.db_only: resp = self.s3.put_blob( folder="text", - blob=r['text'], - sha1hex=r['sha1hex'], + blob=r.text, + sha1hex=r.sha1hex, extension=".txt", ) self.counts['s3-put'] += 1 if not self.s3_only: - resp = self.db.insert_pdf_meta(self.cur, batch, on_conflict="update") + resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update") self.counts['insert-pdf-meta'] += resp[0] self.counts['update-pdf-meta'] += resp[1] - file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta] resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] @@ -454,12 +460,14 @@ class PersistThumbnailWorker(SandcrawlerWorker): self.s3_extension = kwargs.get('s3_extension', ".jpg") self.s3_folder = kwargs.get('s3_folder', "pdf") - def process(self, blob, key=None): + def process(self, blob: bytes, key: Optional[str] = None): """ Processing raw messages, not decoded JSON objects """ - assert key is not None and len(key) == 40 + if isinstance(key, bytes): + key = key.decode('utf-8') + assert key is not None and len(key) == 40 and isinstance(key, str) assert isinstance(blob, bytes) assert len(blob) >= 50 @@ -470,5 +478,4 @@ class PersistThumbnailWorker(SandcrawlerWorker): extension=self.s3_extension, ) self.counts['s3-put'] += 1 - return True -- cgit v1.2.3